diff --git a/example.py b/example.py new file mode 100644 index 0000000..a0fa453 --- /dev/null +++ b/example.py @@ -0,0 +1,35 @@ +from gearsclient import GearsRemoteBuilder as GRB +from gearsclient import log, hashtag, execute, atomic +import redis + +conn = redis.Redis(host='localhost', port=6379) + +# count for each genre how many times it appears + +# class test: +# def __init__(self): +# self.count = 1 + +# def func1(x): +# return test() + +# def func2(x): +# x.count += 1 +# return x + +def func(x): + with atomic(): + execute('hset', 'h', 'foo', 'bar') + execute('hset', 'h', 'foo1', 'bar1') + return x + +res = GRB('ShardsIDReader',r=conn).\ + map(func).\ + run() + +print(res) +# if len(res[1]) > 0: +# print(res[1]) + +# for r in res[0]: +# print('%s' % str(r.count)) diff --git a/gearsclient/__init__.py b/gearsclient/__init__.py index ae81117..fb8722a 100644 --- a/gearsclient/__init__.py +++ b/gearsclient/__init__.py @@ -1 +1 @@ -from .redisgears_builder import GearsRemoteBuilder +from .redisgears_builder import GearsRemoteBuilder, log, gearsConfigGet, execute, atomic, hashtag diff --git a/gearsclient/redisgears_builder.py b/gearsclient/redisgears_builder.py index fefe62b..c884d50 100644 --- a/gearsclient/redisgears_builder.py +++ b/gearsclient/redisgears_builder.py @@ -2,66 +2,82 @@ import cloudpickle import pickle +class GearsRemoteLocalGroupByStep(): + def __init__(self, extractor, reducer): + self.extractor = extractor + self.reducer = reducer + + def AddToGB(self, gb): + gb.localgroupby(self.extractor, self.reducer) + +class GearsRemoteAccumulateStep(): + def __init__(self, accumulator): + self.accumulator = accumulator + + def AddToGB(self, gb): + gb.accumulate(self.accumulator) + +class GearsRemoteRepartitionStep(): + def __init__(self, extractor): + self.extractor = extractor + + def AddToGB(self, gb): + gb.repartition(self.extractor) + class GearsRemoteMapStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.map(self.callback) class GearsRemoteForeachStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.foreach(self.callback) class GearsRemoteFlatMapStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.flatmap(self.callback) class GearsRemoteFilterStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.filter(self.callback) class GearsRemoteCountByStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.countby(self.callback) class GearsRemoteAvgByStep(): def __init__(self, callback): self.callback = callback - def AddToGB(self, gb, globalsDict): - self.callback.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.avg(self.callback) class GearsRemoteCountStep(): def __init__(self): self.callback = callback - def AddToGB(self, gb, globalsDict): + def AddToGB(self, gb): gb.count() class GearsRemoteDistinctStep(): def __init__(self): self.callback = callback - def AddToGB(self, gb, globalsDict): + def AddToGB(self, gb): gb.distinct() class GearsRemoteAggregateStep(): @@ -70,9 +86,7 @@ def __init__(self, zero, seqOp, combOp): self.seqOp = seqOp self.combOp = combOp - def AddToGB(self, gb, globalsDict): - self.seqOp.__globals__.update(globalsDict) - self.combOp.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.aggregate(self.zero, self.seqOp, self.combOp) class GearsRemoteAggregateByStep(): @@ -82,16 +96,14 @@ def __init__(self, extractor, zero, seqOp, combOp): self.seqOp = seqOp self.combOp = combOp - def AddToGB(self, gb, globalsDict): - self.seqOp.__globals__.update(globalsDict) - self.combOp.__globals__.update(globalsDict) + def AddToGB(self, gb): gb.aggregate(self.extractor, self.zero, self.seqOp, self.combOp) class GearsRemoteSortStep(): def __init__(self, reverse): self.reverse = reverse - def AddToGB(self, gb, globalsDict): + def AddToGB(self, gb): gb.sort(self.reverse) class GearsRemoteLimitStep(): @@ -99,36 +111,29 @@ def __init__(self, count, offset): self.count = count self.offset = offset - def AddToGB(self, gb, globalsDict): + def AddToGB(self, gb): gb.limit(self.count, self.offset) class GearsRemoteRunStep(): - def __init__(self, arg, convertToStr, collect): + def __init__(self, arg, convertToStr, collect, kargs): self.arg = arg self.convertToStr = convertToStr self.collect = collect + self.kargs = kargs - def AddToGB(self, gb, globalsDict): - gb.run(self.arg, self.convertToStr, self.collect) + def AddToGB(self, gb): + gb.run(self.arg, self.convertToStr, self.collect, **self.kargs) class GearsRemoteRegisterStep(): - def __init__(self, regex, mode, batch, duration, - eventTypes, keyTypes, onRegistered, onFailedPolicy, - onFailedRetryInterval): - self.regex = regex - self.mode = mode - self.batch = batch - self.duration = duration - self.eventTypes = eventTypes - self.keyTypes = keyTypes - self.onRegistered = onRegistered - self.onFailedPolicy = onFailedPolicy - self.onFailedRetryInterval = onFailedRetryInterval - - def AddToGB(self, gb, globalsDict): - gb.register(self.regex, self.mode, self.batch, self.duration, - self.eventTypes, self.keyTypes, self.onRegistered, self.onFailedPolicy, - self.onFailedRetryInterval) + def __init__(self, prefix, convertToStr, collect, kargs): + self.prefix = prefix + self.convertToStr = convertToStr + self.collect = collect + self.kargs = kargs + + def AddToGB(self, gb): + gb.register(self.prefix, self.convertToStr, self.collect, **self.kargs) + class GearsPipe(): def __init__(self, reader='KeysReader', defaultArg='*'): @@ -136,6 +141,18 @@ def __init__(self, reader='KeysReader', defaultArg='*'): self.defaultArg = defaultArg self.steps = [] + def localgroupby(self, extractor, reducer): + self.steps.append(GearsRemoteLocalGroupByStep(extractor, reducer)) + return self + + def accumulate(self, accumulator): + self.steps.append(GearsRemoteAccumulateStep(accumulator)) + return self + + def repartition(self, extractor): + self.steps.append(GearsRemoteRepartitionStep(extractor)) + return self + def map(self, callback): self.steps.append(GearsRemoteMapStep(callback)) return self @@ -184,15 +201,17 @@ def limit(self, count, offset): self.steps.append(GearsRemoteLimitStep(count, offset)) return self - def run(self, arg, convertToStr, collect): - self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect)) + def run(self, arg, convertToStr, collect, **kargs): + self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect, kargs)) + + def register(self, prefix, convertToStr, collect, **kargs): + self.steps.append(GearsRemoteRegisterStep(prefix, convertToStr, collect, kargs)) + + def createAndRun(self, GB): + gb = GB(self.reader) + for s in self.steps: + s.AddToGB(gb) - def register(self, regex, mode, batch, duration, - eventTypes, keyTypes, onRegistered, onFailedPolicy, - onFailedRetryInterval): - self.steps.append(GearsRemoteRegisterStep(regex, mode, batch, duration, - eventTypes, keyTypes, onRegistered, onFailedPolicy, - onFailedRetryInterval)) class GearsRemoteBuilder(): @@ -202,6 +221,18 @@ def __init__(self, reader='KeysReader', defaultArg='*', r=None): self.r = r self.pipe = GearsPipe(reader, defaultArg) + def localgroupby(self, extractor, reducer): + self.pipe.localgroupby(extractor, reducer) + return self + + def accumulate(self, accumulator): + self.pipe.accumulate(accumulator) + return self + + def repartition(self, extractor): + self.pipe.repartition(extractor) + return self + def map(self, callback): self.pipe.map(callback) return self @@ -250,20 +281,59 @@ def limit(self, count, offset=0): self.pipe.limit(count, offset) return self - def run(self, arg=None, convertToStr=False, collect=True): - self.pipe.run(arg, convertToStr, collect) + def run(self, arg=None, collect=True, **kargs): + self.map(lambda x: cloudpickle.dumps(x)) + self.pipe.run(arg, False, collect) selfBytes = cloudpickle.dumps(self.pipe) - results = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes) + serverCode = ''' +import cloudpickle +p = cloudpickle.loads(%s) +p.createAndRun(GB) + ''' % selfBytes + results = self.r.execute_command('RG.PYEXECUTE', serverCode) res, errs = results + res = [cloudpickle.loads(record) for record in res] return res, errs - def register(self, regex='*', mode='async', batch=1, duration=0, - eventTypes=None, keyTypes=None, onRegistered=None, onFailedPolicy="continue", - onFailedRetryInterval=1): - self.pipe.register(regex, mode, batch, duration, - eventTypes, keyTypes, onRegistered, onFailedPolicy, - onFailedRetryInterval) + def register(self, prefix='*', convertToStr=True, collect=True, **kargs): + self.pipe.register(prefix, convertToStr, collect, **kargs) selfBytes = cloudpickle.dumps(self.pipe) - res = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes) + serverCode = ''' +import cloudpickle +p = cloudpickle.loads(%s) +p.createAndRun(GB) + ''' % selfBytes + res = self.r.execute_command('RG.PYEXECUTE', serverCode) return res +def log(msg, level='notice'): + from redisgears import log as redisLog + redisLog(msg, level=level) + +def gearsConfigGet(key, default=None): + from redisgears import config_get as redisConfigGet + val = redisConfigGet(key) + return val if val is not None else default + +def execute(*args): + from redisgears import executeCommand as redisExecute + return redisExecute(*args) + +def hashtag(): + from redisgears import getMyHashTag as redisHashtag + return redisHashtag() + +class atomic: + def __init__(self): + from redisgears import atomicCtx as redisAtomic + self.atomic = redisAtomic() + pass + + def __enter__(self): + self.atomic.__enter__() + return self + + def __exit__(self, type, value, traceback): + self.atomic.__exit__() + +