From 75fa5d40a23663b88a72ec81a6f05b6017cdbab3 Mon Sep 17 00:00:00 2001 From: manueljgarciar Date: Sun, 5 Apr 2020 20:07:03 +0200 Subject: [PATCH] Run and Register functions doesn't work In Gears v0.9 (latest), Run and Register functions doesn't work. It works fine now but it is not backward compatible from previous Gears versions. --- gearsclient/redisgears_builder.py | 39 +++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/gearsclient/redisgears_builder.py b/gearsclient/redisgears_builder.py index 2224888..fefe62b 100644 --- a/gearsclient/redisgears_builder.py +++ b/gearsclient/redisgears_builder.py @@ -112,12 +112,23 @@ def AddToGB(self, gb, globalsDict): gb.run(self.arg, self.convertToStr, self.collect) class GearsRemoteRegisterStep(): - def __init__(self, arg): - self.arg = arg + def __init__(self, regex, mode, batch, duration, + eventTypes, keyTypes, onRegistered, onFailedPolicy, + onFailedRetryInterval): + self.regex = regex + self.mode = mode + self.batch = batch + self.duration = duration + self.eventTypes = eventTypes + self.keyTypes = keyTypes + self.onRegistered = onRegistered + self.onFailedPolicy = onFailedPolicy + self.onFailedRetryInterval = onFailedRetryInterval def AddToGB(self, gb, globalsDict): - gb.register(self.arg) if self.arg else gb.register() - + gb.register(self.regex, self.mode, self.batch, self.duration, + self.eventTypes, self.keyTypes, self.onRegistered, self.onFailedPolicy, + self.onFailedRetryInterval) class GearsPipe(): def __init__(self, reader='KeysReader', defaultArg='*'): @@ -176,8 +187,12 @@ def limit(self, count, offset): def run(self, arg, convertToStr, collect): self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect)) - def register(self, arg): - self.steps.append(GearsRemoteRegisterStep(arg)) + def register(self, regex, mode, batch, duration, + eventTypes, keyTypes, onRegistered, onFailedPolicy, + onFailedRetryInterval): + self.steps.append(GearsRemoteRegisterStep(regex, mode, batch, duration, + eventTypes, keyTypes, onRegistered, onFailedPolicy, + onFailedRetryInterval)) class GearsRemoteBuilder(): @@ -220,7 +235,7 @@ def distinct(self): return self def aggregate(self, zero, seqOp, combOp): - self.pipe.countby(callback) + self.pipe.aggregate(zero, seqOp, combOp) return self def aggregateby(self, extractor, zero, seqOp, combOp): @@ -240,11 +255,15 @@ def run(self, arg=None, convertToStr=False, collect=True): selfBytes = cloudpickle.dumps(self.pipe) results = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes) res, errs = results - res = [cloudpickle.loads(record) for record in res] return res, errs - def register(self, arg=None): - self.pipe.register(arg) + def register(self, regex='*', mode='async', batch=1, duration=0, + eventTypes=None, keyTypes=None, onRegistered=None, onFailedPolicy="continue", + onFailedRetryInterval=1): + self.pipe.register(regex, mode, batch, duration, + eventTypes, keyTypes, onRegistered, onFailedPolicy, + onFailedRetryInterval) selfBytes = cloudpickle.dumps(self.pipe) res = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes) return res +