From 40aa7abd0a559a8431b7d4a43c594fc9ce9c5a9b Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Mon, 4 May 2020 20:55:18 +0300 Subject: [PATCH 1/3] Fixed 1. No any spacific support from RedisGears is required now. This client use rg.pyexecute 2. Match signitures of 'run' and 'register' functions 3. Added repartition step --- example.py | 35 +++++++ gearsclient/__init__.py | 2 +- gearsclient/redisgears_builder.py | 159 +++++++++++++++++++----------- 3 files changed, 135 insertions(+), 61 deletions(-) create mode 100644 example.py diff --git a/example.py b/example.py new file mode 100644 index 0000000..a0fa453 --- /dev/null +++ b/example.py @@ -0,0 +1,35 @@ +from gearsclient import GearsRemoteBuilder as GRB +from gearsclient import log, hashtag, execute, atomic +import redis + +conn = redis.Redis(host='localhost', port=6379) + +# count for each genre how many times it appears + +# class test: +# def __init__(self): +# self.count = 1 + +# def func1(x): +# return test() + +# def func2(x): +# x.count += 1 +# return x + +def func(x): + with atomic(): + execute('hset', 'h', 'foo', 'bar') + execute('hset', 'h', 'foo1', 'bar1') + return x + +res = GRB('ShardsIDReader',r=conn).\ + map(func).\ + run() + +print(res) +# if len(res[1]) > 0: +# print(res[1]) + +# for r in res[0]: +# print('%s' % str(r.count)) diff --git a/gearsclient/__init__.py b/gearsclient/__init__.py index ae81117..fb8722a 100644 --- a/gearsclient/__init__.py +++ b/gearsclient/__init__.py @@ -1 +1 @@ -from .redisgears_builder import GearsRemoteBuilder +from .redisgears_builder import GearsRemoteBuilder, log, gearsConfigGet, execute, atomic, hashtag diff --git a/gearsclient/redisgears_builder.py b/gearsclient/redisgears_builder.py index fefe62b..00fdb97 100644 --- a/gearsclient/redisgears_builder.py +++ b/gearsclient/redisgears_builder.py @@ -2,66 +2,67 @@ import cloudpickle import pickle +class GearsRemoteRepartitionStep(): + def __init__(self, extractor): + self.extractor = extractor + + def AddToGB(self, gb): + gb.repartition(self.extractor) + class GearsRemoteMapStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.map(self.callback) class GearsRemoteForeachStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.foreach(self.callback) class GearsRemoteFlatMapStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.flatmap(self.callback) class GearsRemoteFilterStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.filter(self.callback) class GearsRemoteCountByStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.countby(self.callback) class GearsRemoteAvgByStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.avg(self.callback) class GearsRemoteCountStep(): def __init__(self): self.callback = callback - def AddToGB(self, gb, globalsDict): + def AddToGB(self, gb): gb.count() class GearsRemoteDistinctStep(): def __init__(self): self.callback = callback - def AddToGB(self, gb, globalsDict): + def AddToGB(self, gb): gb.distinct() class GearsRemoteAggregateStep(): @@ -70,9 +71,7 @@ def __init__(self, zero, seqOp, combOp): self.seqOp = seqOp self.combOp = combOp - def AddToGB(self, gb, globalsDict): - self.seqOp.__globals__.update(globalsDict) - self.combOp.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.aggregate(self.zero, self.seqOp, self.combOp) class GearsRemoteAggregateByStep(): @@ -82,16 +81,14 @@ def __init__(self, extractor, zero, seqOp, combOp): self.seqOp = seqOp self.combOp = combOp - def AddToGB(self, gb, globalsDict): - self.seqOp.__globals__.update(globalsDict) - self.combOp.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.aggregate(self.extractor, self.zero, self.seqOp, self.combOp) class GearsRemoteSortStep(): def __init__(self, reverse): self.reverse = reverse - def AddToGB(self, gb, globalsDict): + def AddToGB(self, gb): gb.sort(self.reverse) class GearsRemoteLimitStep(): @@ -99,36 +96,29 @@ def __init__(self, count, offset): self.count = count self.offset = offset - def AddToGB(self, gb, globalsDict): + def AddToGB(self, gb): gb.limit(self.count, self.offset) class GearsRemoteRunStep(): - def __init__(self, arg, convertToStr, collect): + def __init__(self, arg, convertToStr, collect, kargs): self.arg = arg self.convertToStr = convertToStr self.collect = collect + self.kargs = kargs - def AddToGB(self, gb, globalsDict): - gb.run(self.arg, self.convertToStr, self.collect) + def AddToGB(self, gb): + gb.run(self.arg, self.convertToStr, self.collect, **self.kargs) class GearsRemoteRegisterStep(): - def __init__(self, regex, mode, batch, duration, - eventTypes, keyTypes, onRegistered, onFailedPolicy, - onFailedRetryInterval): - self.regex = regex - self.mode = mode - self.batch = batch - self.duration = duration - self.eventTypes = eventTypes - self.keyTypes = keyTypes - self.onRegistered = onRegistered - self.onFailedPolicy = onFailedPolicy - self.onFailedRetryInterval = onFailedRetryInterval - - def AddToGB(self, gb, globalsDict): - gb.register(self.regex, self.mode, self.batch, self.duration, - self.eventTypes, self.keyTypes, self.onRegistered, self.onFailedPolicy, - self.onFailedRetryInterval) + def __init__(self, prefix, convertToStr, collect, kargs): + self.prefix = prefix + self.convertToStr = convertToStr + self.collect = collect + self.kargs = kargs + + def AddToGB(self, gb): + gb.register(self.prefix, self.convertToStr, self.collect, **self.kargs) + class GearsPipe(): def __init__(self, reader='KeysReader', defaultArg='*'): @@ -136,6 +126,10 @@ def __init__(self, reader='KeysReader', defaultArg='*'): self.defaultArg = defaultArg self.steps = [] + def repartition(self, extractor): + self.steps.append(GearsRemoteRepartitionStep(extractor)) + return self + def map(self, callback): self.steps.append(GearsRemoteMapStep(callback)) return self @@ -184,15 +178,17 @@ def limit(self, count, offset): self.steps.append(GearsRemoteLimitStep(count, offset)) return self - def run(self, arg, convertToStr, collect): - self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect)) + def run(self, arg, convertToStr, collect, **kargs): + self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect, kargs)) + + def register(self, prefix, convertToStr, collect, **kargs): + self.steps.append(GearsRemoteRegisterStep(prefix, convertToStr, collect, kargs)) + + def createAndRun(self, GB): + gb = GB(self.reader) + for s in self.steps: + s.AddToGB(gb) - def register(self, regex, mode, batch, duration, - eventTypes, keyTypes, onRegistered, onFailedPolicy, - onFailedRetryInterval): - self.steps.append(GearsRemoteRegisterStep(regex, mode, batch, duration, - eventTypes, keyTypes, onRegistered, onFailedPolicy, - onFailedRetryInterval)) class GearsRemoteBuilder(): @@ -202,6 +198,10 @@ def __init__(self, reader='KeysReader', defaultArg='*', r=None): self.r = r self.pipe = GearsPipe(reader, defaultArg) + def repartition(self, extractor): + self.pipe.repartition(extractor) + return self + def map(self, callback): self.pipe.map(callback) return self @@ -250,20 +250,59 @@ def limit(self, count, offset=0): self.pipe.limit(count, offset) return self - def run(self, arg=None, convertToStr=False, collect=True): - self.pipe.run(arg, convertToStr, collect) + def run(self, arg=None, collect=True, **kargs): + self.map(lambda x: cloudpickle.dumps(x)) + self.pipe.run(arg, False, collect) selfBytes = cloudpickle.dumps(self.pipe) - results = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes) + serverCode = ''' +import cloudpickle +p = cloudpickle.loads(%s) +p.createAndRun(GB) + ''' % selfBytes + results = self.r.execute_command('RG.PYEXECUTE', serverCode) res, errs = results + res = [cloudpickle.loads(record) for record in res] return res, errs - def register(self, regex='*', mode='async', batch=1, duration=0, - eventTypes=None, keyTypes=None, onRegistered=None, onFailedPolicy="continue", - onFailedRetryInterval=1): - self.pipe.register(regex, mode, batch, duration, - eventTypes, keyTypes, onRegistered, onFailedPolicy, - onFailedRetryInterval) + def register(self, prefix='*', convertToStr=True, collect=True, **kargs): + self.pipe.register(prefix, convertToStr, collect, **kargs) selfBytes = cloudpickle.dumps(self.pipe) - res = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes) + serverCode = ''' +import cloudpickle +p = cloudpickle.loads(%s) +p.createAndRun(GB) + ''' % selfBytes + res = self.r.execute_command('RG.PYEXECUTE', serverCode) return res +def log(msg, level='notice'): + from redisgears import log as redisLog + redisLog(msg, level=level) + +def gearsConfigGet(key, default=None): + from redisgears import config_get as redisConfigGet + val = redisConfigGet(key) + return val if val is not None else default + +def execute(*args): + from redisgears import executeCommand as redisExecute + return redisExecute(*args) + +def hashtag(): + from redisgears import getMyHashTag as redisHashtag + return redisHashtag() + +class atomic: + def __init__(self): + from redisgears import atomicCtx as redisAomic + self.atomic = redisAomic() + pass + + def __enter__(self): + self.atomic.__enter__() + return self + + def __exit__(self, type, value, traceback): + self.atomic.__exit__() + + From 94e39934ec38a1fcfe6b1260b3aa083320e41153 Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Tue, 5 May 2020 15:05:25 +0300 Subject: [PATCH 2/3] added accumulate and localgroupby --- gearsclient/redisgears_builder.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/gearsclient/redisgears_builder.py b/gearsclient/redisgears_builder.py index 00fdb97..6930f7f 100644 --- a/gearsclient/redisgears_builder.py +++ b/gearsclient/redisgears_builder.py @@ -2,6 +2,21 @@ import cloudpickle import pickle +class GearsRemoteLocalGroupByStep(): + def __init__(self, extractor, reducer): + self.extractor = extractor + self.reducer = reducer + + def AddToGB(self, gb): + gb.localgroupby(self.extractor, self.reducer) + +class GearsRemoteAccumulateStep(): + def __init__(self, accumulator): + self.accumulator = accumulator + + def AddToGB(self, gb): + gb.accumulate(self.accumulator) + class GearsRemoteRepartitionStep(): def __init__(self, extractor): self.extractor = extractor @@ -126,6 +141,14 @@ def __init__(self, reader='KeysReader', defaultArg='*'): self.defaultArg = defaultArg self.steps = [] + def localgroupby(self, extractor, reducer): + self.steps.append(GearsRemoteLocalGroupByStep(extractor, reducer)) + return self + + def accumulate(self, accumulator): + self.steps.append(GearsRemoteAccumulateStep(accumulator)) + return self + def repartition(self, extractor): self.steps.append(GearsRemoteRepartitionStep(extractor)) return self @@ -198,6 +221,14 @@ def __init__(self, reader='KeysReader', defaultArg='*', r=None): self.r = r self.pipe = GearsPipe(reader, defaultArg) + def localgroupby(self, extractor, reducer): + self.pipe.localgroupby(extractor, reducer) + return self + + def accumulate(self, accumulator): + self.pipe.accumulate(accumulator) + return self + def repartition(self, extractor): self.pipe.repartition(extractor) return self From 2ea9eaf94163674a01a1f28bc5dff0a41e18e35b Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Wed, 6 May 2020 11:53:59 +0300 Subject: [PATCH 3/3] review fixes --- gearsclient/redisgears_builder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gearsclient/redisgears_builder.py b/gearsclient/redisgears_builder.py index 6930f7f..c884d50 100644 --- a/gearsclient/redisgears_builder.py +++ b/gearsclient/redisgears_builder.py @@ -325,8 +325,8 @@ def hashtag(): class atomic: def __init__(self): - from redisgears import atomicCtx as redisAomic - self.atomic = redisAomic() + from redisgears import atomicCtx as redisAtomic + self.atomic = redisAtomic() pass def __enter__(self):