-
Notifications
You must be signed in to change notification settings - Fork 11
Closed
Description
Describe the bug
StreamReader don't seem to take into account 'fromId' parameter.
To reproduce
Redis
xadd mystream * field value0
"1649425213053-0"
xadd mystream * field value1
"1649425219031-0"
xadd mystream * field value2
"1649425223070-0"
from gearsclient import GearsRemoteBuilder as GearsBuilder
import redis
conn = redis.Redis(host="localhost", port=6379)
GearsBuilder('StreamReader', r=conn).run('mystream', fromId='1649425219031')([{'key': 'mystream', 'id': '1649425213053-0', 'value': {'field': 'value0'}}, {'key': 'mystream', 'id': '1649425219031-0', 'value': {'field': 'value1'}}, {'key': 'mystream', 'id': '1649425223070-0', 'value': {'field': 'value2'}}], [])
Excepted Behaviour
([{'key': 'mystream', 'id': '1649425223070-0', 'value': {'field': 'value2'}}], [])
Possible Fix
Add **kargs into self.pipe.run(arg, False, collect, **kargs) (line 290)
def run(self, arg=None, collect=True, **kargs):
self.map(lambda x: cloudpickle.dumps(x))
self.pipe.run(arg, False, collect, **kargs)
selfBytes = cloudpickle.dumps(self.pipe)
serverCode = '''
import cloudpickle
p = cloudpickle.loads(%s)
p.createAndRun(GB)
''' % selfBytes
results = self.r.execute_command('RG.PYEXECUTE', serverCode, *self.requirements)
res, errs = results
res = [cloudpickle.loads(record) for record in res]
return res, errsMetadata
Metadata
Assignees
Labels
No labels