Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from gearsclient import GearsRemoteBuilder as GRB
from gearsclient import log, hashtag, execute, atomic
import redis

conn = redis.Redis(host='localhost', port=6379)

# count for each genre how many times it appears

# class test:
# def __init__(self):
# self.count = 1

# def func1(x):
# return test()

# def func2(x):
# x.count += 1
# return x

def func(x):
with atomic():
execute('hset', 'h', 'foo', 'bar')
execute('hset', 'h', 'foo1', 'bar1')
return x

res = GRB('ShardsIDReader',r=conn).\
map(func).\
run()

print(res)
# if len(res[1]) > 0:
# print(res[1])

# for r in res[0]:
# print('%s' % str(r.count))
2 changes: 1 addition & 1 deletion gearsclient/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .redisgears_builder import GearsRemoteBuilder
from .redisgears_builder import GearsRemoteBuilder, log, gearsConfigGet, execute, atomic, hashtag
190 changes: 130 additions & 60 deletions gearsclient/redisgears_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,66 +2,82 @@
import cloudpickle
import pickle

class GearsRemoteLocalGroupByStep():
def __init__(self, extractor, reducer):
self.extractor = extractor
self.reducer = reducer

def AddToGB(self, gb):
gb.localgroupby(self.extractor, self.reducer)

class GearsRemoteAccumulateStep():
def __init__(self, accumulator):
self.accumulator = accumulator

def AddToGB(self, gb):
gb.accumulate(self.accumulator)

class GearsRemoteRepartitionStep():
def __init__(self, extractor):
self.extractor = extractor

def AddToGB(self, gb):
gb.repartition(self.extractor)

class GearsRemoteMapStep():
def __init__(self, callback):
self.callback = callback

def AddToGB(self, gb, globalsDict):
self.callback.__globals__.update(globalsDict)
def AddToGB(self, gb):
gb.map(self.callback)

class GearsRemoteForeachStep():
def __init__(self, callback):
self.callback = callback

def AddToGB(self, gb, globalsDict):
self.callback.__globals__.update(globalsDict)
def AddToGB(self, gb):
gb.foreach(self.callback)

class GearsRemoteFlatMapStep():
def __init__(self, callback):
self.callback = callback

def AddToGB(self, gb, globalsDict):
self.callback.__globals__.update(globalsDict)
def AddToGB(self, gb):
gb.flatmap(self.callback)

class GearsRemoteFilterStep():
def __init__(self, callback):
self.callback = callback

def AddToGB(self, gb, globalsDict):
self.callback.__globals__.update(globalsDict)
def AddToGB(self, gb):
gb.filter(self.callback)

class GearsRemoteCountByStep():
def __init__(self, callback):
self.callback = callback

def AddToGB(self, gb, globalsDict):
self.callback.__globals__.update(globalsDict)
def AddToGB(self, gb):
gb.countby(self.callback)

class GearsRemoteAvgByStep():
def __init__(self, callback):
self.callback = callback

def AddToGB(self, gb, globalsDict):
self.callback.__globals__.update(globalsDict)
def AddToGB(self, gb):
gb.avg(self.callback)

class GearsRemoteCountStep():
def __init__(self):
self.callback = callback

def AddToGB(self, gb, globalsDict):
def AddToGB(self, gb):
gb.count()

class GearsRemoteDistinctStep():
def __init__(self):
self.callback = callback

def AddToGB(self, gb, globalsDict):
def AddToGB(self, gb):
gb.distinct()

class GearsRemoteAggregateStep():
Expand All @@ -70,9 +86,7 @@ def __init__(self, zero, seqOp, combOp):
self.seqOp = seqOp
self.combOp = combOp

def AddToGB(self, gb, globalsDict):
self.seqOp.__globals__.update(globalsDict)
self.combOp.__globals__.update(globalsDict)
def AddToGB(self, gb):
gb.aggregate(self.zero, self.seqOp, self.combOp)

class GearsRemoteAggregateByStep():
Expand All @@ -82,60 +96,63 @@ def __init__(self, extractor, zero, seqOp, combOp):
self.seqOp = seqOp
self.combOp = combOp

def AddToGB(self, gb, globalsDict):
self.seqOp.__globals__.update(globalsDict)
self.combOp.__globals__.update(globalsDict)
def AddToGB(self, gb):
gb.aggregate(self.extractor, self.zero, self.seqOp, self.combOp)

class GearsRemoteSortStep():
def __init__(self, reverse):
self.reverse = reverse

def AddToGB(self, gb, globalsDict):
def AddToGB(self, gb):
gb.sort(self.reverse)

class GearsRemoteLimitStep():
def __init__(self, count, offset):
self.count = count
self.offset = offset

def AddToGB(self, gb, globalsDict):
def AddToGB(self, gb):
gb.limit(self.count, self.offset)

class GearsRemoteRunStep():
def __init__(self, arg, convertToStr, collect):
def __init__(self, arg, convertToStr, collect, kargs):
self.arg = arg
self.convertToStr = convertToStr
self.collect = collect
self.kargs = kargs

def AddToGB(self, gb, globalsDict):
gb.run(self.arg, self.convertToStr, self.collect)
def AddToGB(self, gb):
gb.run(self.arg, self.convertToStr, self.collect, **self.kargs)

class GearsRemoteRegisterStep():
def __init__(self, regex, mode, batch, duration,
eventTypes, keyTypes, onRegistered, onFailedPolicy,
onFailedRetryInterval):
self.regex = regex
self.mode = mode
self.batch = batch
self.duration = duration
self.eventTypes = eventTypes
self.keyTypes = keyTypes
self.onRegistered = onRegistered
self.onFailedPolicy = onFailedPolicy
self.onFailedRetryInterval = onFailedRetryInterval

def AddToGB(self, gb, globalsDict):
gb.register(self.regex, self.mode, self.batch, self.duration,
self.eventTypes, self.keyTypes, self.onRegistered, self.onFailedPolicy,
self.onFailedRetryInterval)
def __init__(self, prefix, convertToStr, collect, kargs):
self.prefix = prefix
self.convertToStr = convertToStr
self.collect = collect
self.kargs = kargs

def AddToGB(self, gb):
gb.register(self.prefix, self.convertToStr, self.collect, **self.kargs)


class GearsPipe():
def __init__(self, reader='KeysReader', defaultArg='*'):
self.reader = reader
self.defaultArg = defaultArg
self.steps = []

def localgroupby(self, extractor, reducer):
self.steps.append(GearsRemoteLocalGroupByStep(extractor, reducer))
return self

def accumulate(self, accumulator):
self.steps.append(GearsRemoteAccumulateStep(accumulator))
return self

def repartition(self, extractor):
self.steps.append(GearsRemoteRepartitionStep(extractor))
return self

def map(self, callback):
self.steps.append(GearsRemoteMapStep(callback))
return self
Expand Down Expand Up @@ -184,15 +201,17 @@ def limit(self, count, offset):
self.steps.append(GearsRemoteLimitStep(count, offset))
return self

def run(self, arg, convertToStr, collect):
self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect))
def run(self, arg, convertToStr, collect, **kargs):
self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect, kargs))

def register(self, prefix, convertToStr, collect, **kargs):
self.steps.append(GearsRemoteRegisterStep(prefix, convertToStr, collect, kargs))

def createAndRun(self, GB):
gb = GB(self.reader)
for s in self.steps:
s.AddToGB(gb)

def register(self, regex, mode, batch, duration,
eventTypes, keyTypes, onRegistered, onFailedPolicy,
onFailedRetryInterval):
self.steps.append(GearsRemoteRegisterStep(regex, mode, batch, duration,
eventTypes, keyTypes, onRegistered, onFailedPolicy,
onFailedRetryInterval))


class GearsRemoteBuilder():
Expand All @@ -202,6 +221,18 @@ def __init__(self, reader='KeysReader', defaultArg='*', r=None):
self.r = r
self.pipe = GearsPipe(reader, defaultArg)

def localgroupby(self, extractor, reducer):
self.pipe.localgroupby(extractor, reducer)
return self

def accumulate(self, accumulator):
self.pipe.accumulate(accumulator)
return self

def repartition(self, extractor):
self.pipe.repartition(extractor)
return self

def map(self, callback):
self.pipe.map(callback)
return self
Expand Down Expand Up @@ -250,20 +281,59 @@ def limit(self, count, offset=0):
self.pipe.limit(count, offset)
return self

def run(self, arg=None, convertToStr=False, collect=True):
self.pipe.run(arg, convertToStr, collect)
def run(self, arg=None, collect=True, **kargs):
self.map(lambda x: cloudpickle.dumps(x))
self.pipe.run(arg, False, collect)
selfBytes = cloudpickle.dumps(self.pipe)
results = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes)
serverCode = '''
import cloudpickle
p = cloudpickle.loads(%s)
p.createAndRun(GB)
''' % selfBytes
results = self.r.execute_command('RG.PYEXECUTE', serverCode)
res, errs = results
res = [cloudpickle.loads(record) for record in res]
return res, errs

def register(self, regex='*', mode='async', batch=1, duration=0,
eventTypes=None, keyTypes=None, onRegistered=None, onFailedPolicy="continue",
onFailedRetryInterval=1):
self.pipe.register(regex, mode, batch, duration,
eventTypes, keyTypes, onRegistered, onFailedPolicy,
onFailedRetryInterval)
def register(self, prefix='*', convertToStr=True, collect=True, **kargs):
self.pipe.register(prefix, convertToStr, collect, **kargs)
selfBytes = cloudpickle.dumps(self.pipe)
res = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes)
serverCode = '''
import cloudpickle
p = cloudpickle.loads(%s)
p.createAndRun(GB)
''' % selfBytes
res = self.r.execute_command('RG.PYEXECUTE', serverCode)
return res

def log(msg, level='notice'):
from redisgears import log as redisLog
redisLog(msg, level=level)

def gearsConfigGet(key, default=None):
from redisgears import config_get as redisConfigGet
val = redisConfigGet(key)
return val if val is not None else default

def execute(*args):
from redisgears import executeCommand as redisExecute
return redisExecute(*args)

def hashtag():
from redisgears import getMyHashTag as redisHashtag
return redisHashtag()

class atomic:
def __init__(self):
from redisgears import atomicCtx as redisAtomic
self.atomic = redisAtomic()
pass

def __enter__(self):
self.atomic.__enter__()
return self

def __exit__(self, type, value, traceback):
self.atomic.__exit__()