Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 22 additions & 66 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,39 @@
# Check https://circleci.com/docs/2.0/language-python/ for more details
#
version: 2.1
commands:
early_return_for_forked_pull_requests:
description: >-
If this build is from a fork, stop executing the current job and return success.
This is useful to avoid steps that will fail due to missing credentials.
steps:
- run:
name: Early return if this build is from a forked PR
command: |
if [ -n "$CIRCLE_PR_NUMBER" ]; then
echo "Nothing to do for forked PRs, so marking this step successful"
circleci step halt
fi
jobs:
build:
build_and_test:
docker:
- image: circleci/python:3.6.1
- image: redislabs/redisgears:edge

working_directory: ~/repo

- image: 'ubuntu:bionic'
steps:
- run:
name: installations
command: apt-get -qq update; apt-get install -y ca-certificates wget build-essential git python-pip
- run:
name: Redis_5_upgrade
command: cd ..; git clone https://github.com/antirez/redis.git; cd ./redis; git fetch; git checkout 6.0.1; make; make install
- run:
name: download_RedisGears
command: cd ..; wget http://redismodules.s3.amazonaws.com/redisgears/snapshots/redisgears.linux-bionic-x64.master.zip; apt-get install -y unzip; unzip redisgears.linux-bionic-x64.master.zip
- run:
name: download_RedisGears_deps
command: cd ..; wget http://redismodules.s3.amazonaws.com/redisgears/snapshots/redisgears-dependencies.linux-bionic-x64.master.tgz; mkdir -p /var/opt/redislabs/modules/rg/; cd /var/opt/redislabs/modules/rg/; tar -xvf /root/redisgears-dependencies.linux-bionic-x64.master.tgz
- checkout

# Download and cache dependencies
- restore_cache:
keys:
- v1-dependencies-{{ checksum "requirements.txt" }}
# fallback to using the latest cache if no exact match is found
- v1-dependencies-

- run:
name: install dependencies
command: |
python -m venv venv
. venv/bin/activate
pip install -r requirements.txt
pip install codecov

- save_cache:
paths:
- ./venv
key: v1-dependencies-{{ checksum "requirements.txt" }}

name: install_redisgears_py_to_RedisGears_virtual_env
command: /var/opt/redislabs/modules/rg/python3_99.99.99/bin/python3 setup.py install
- run:
name: test dist
command: python setup.py sdist

name: install RLTest
command: /var/opt/redislabs/modules/rg/python3_99.99.99/bin/python3 -m pip install git+https://github.com/Grokzen/redis-py-cluster.git@master git+https://github.com/RedisLabsModules/RLTest.git
- run:
name: test example
command: |
. venv/bin/activate
python example.py

# - run:
# name: run tests
# command: |
# . venv/bin/activate
# coverage run test.py

- early_return_for_forked_pull_requests

# - run:
# name: codecove
# command: |
# . venv/bin/activate
# codecov
name: run_tests
command: /var/opt/redislabs/modules/rg/python3_99.99.99/bin/python3 -m RLTest --module ../redisgears.so

# - store_artifacts:
# path: test-reports
# destination: test-reports

workflows:
version: 2
commit:
jobs:
- build
- build_and_test
nightly:
triggers:
- schedule:
Expand All @@ -89,4 +45,4 @@ workflows:
only:
- master
jobs:
- build
- build_and_test
19 changes: 12 additions & 7 deletions gearsclient/redisgears_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, callback):
def AddToGB(self, gb):
gb.countby(self.callback)

class GearsRemoteAvgByStep():
class GearsRemoteAvgStep():
def __init__(self, callback):
self.callback = callback

Expand All @@ -68,14 +68,14 @@ def AddToGB(self, gb):

class GearsRemoteCountStep():
def __init__(self):
self.callback = callback
pass

def AddToGB(self, gb):
gb.count()

class GearsRemoteDistinctStep():
def __init__(self):
self.callback = callback
pass

def AddToGB(self, gb):
gb.distinct()
Expand All @@ -97,7 +97,7 @@ def __init__(self, extractor, zero, seqOp, combOp):
self.combOp = combOp

def AddToGB(self, gb):
gb.aggregate(self.extractor, self.zero, self.seqOp, self.combOp)
gb.aggregateby(self.extractor, self.zero, self.seqOp, self.combOp)

class GearsRemoteSortStep():
def __init__(self, reverse):
Expand Down Expand Up @@ -215,11 +215,16 @@ def createAndRun(self, GB):


class GearsRemoteBuilder():
def __init__(self, reader='KeysReader', defaultArg='*', r=None):
def __init__(self, reader='KeysReader', defaultArg='*', r=None, requirements=[], addClientToRequirements=True):
if r is None:
r = redis.Redis()
self.r = r
self.pipe = GearsPipe(reader, defaultArg)
self.requirements = requirements
if addClientToRequirements:
self.requirements += ['git+https://github.com/RedisGears/redisgears-py.git']
if len(self.requirements) > 0:
self.requirements = ['REQUIREMENTS'] + self.requirements

def localgroupby(self, extractor, reducer):
self.pipe.localgroupby(extractor, reducer)
Expand Down Expand Up @@ -290,7 +295,7 @@ def run(self, arg=None, collect=True, **kargs):
p = cloudpickle.loads(%s)
p.createAndRun(GB)
''' % selfBytes
results = self.r.execute_command('RG.PYEXECUTE', serverCode)
results = self.r.execute_command('RG.PYEXECUTE', serverCode, *self.requirements)
res, errs = results
res = [cloudpickle.loads(record) for record in res]
return res, errs
Expand All @@ -303,7 +308,7 @@ def register(self, prefix='*', convertToStr=True, collect=True, **kargs):
p = cloudpickle.loads(%s)
p.createAndRun(GB)
''' % selfBytes
res = self.r.execute_command('RG.PYEXECUTE', serverCode)
res = self.r.execute_command('RG.PYEXECUTE', serverCode, *self.requirements)
return res

def log(msg, level='notice'):
Expand Down
111 changes: 111 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from gearsclient import GearsRemoteBuilder as GRB
from gearsclient import log, hashtag, execute, atomic

counter = 0

def getGB(env, reader='KeysReader'):
return GRB(reader, r=env.getConnection(), addClientToRequirements=False)

def test_map(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '2')
env.cmd('set', 'z', '3')
res = getGB(env).map(lambda x: x['value']).sort().run()
env.assertEqual(res, (['1', '2', '3'], []))

def test_filter(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '2')
env.cmd('set', 'z', '3')
res = getGB(env).map(lambda x: x['value']).filter(lambda x: x=='1').run()
env.assertEqual(res, (['1'], []))

def test_foreach(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '2')
env.cmd('set', 'z', '3')
def increase(x):
global counter
counter += 1

# important to notice, the counte will increased on the server size and not on client side!!
res = getGB(env).foreach(increase).map(lambda x: counter).run()
env.assertEqual(res, ([1, 2, 3], []))

def test_flatmap(env):
env.cmd('lpush', 'l', '1', '2', '3')
res = getGB(env, 'KeysOnlyReader').map(lambda x: execute('lrange', x, '0', '-1')).flatmap(lambda x: x).run()
env.assertEqual(res, (['1', '2', '3'], []))

def test_countby(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '1')
env.cmd('set', 'z', '2')
env.cmd('set', 't', '2')
res = getGB(env).map(lambda x: x['value']).countby().map(lambda x: (x['key'], x['value'])).sort().run()
env.assertEqual(res, ([('1', 2), ('2', 2)], []))

def test_avg(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '1')
env.cmd('set', 'z', '2')
env.cmd('set', 't', '2')
res = getGB(env).map(lambda x: x['value']).avg().run()
env.assertEqual(res, ([1.5], []))

def test_count(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '1')
env.cmd('set', 'z', '2')
env.cmd('set', 't', '2')
res = getGB(env).count().run()
env.assertEqual(res, ([4], []))

def test_distinct(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '1')
env.cmd('set', 'z', '2')
env.cmd('set', 't', '2')
res = getGB(env).map(lambda x: x['value']).distinct().count().run()
env.assertEqual(res, ([2], []))

def test_aggregate(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '1')
env.cmd('set', 'z', '2')
env.cmd('set', 't', '2')
res = getGB(env).map(lambda x: x['value']).aggregate(0, lambda a, r: a + int(r), lambda a, r: a + r).run()
env.assertEqual(res, ([6], []))

def test_aggregateby(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '1')
env.cmd('set', 'z', '2')
env.cmd('set', 't', '2')
res = getGB(env).map(lambda x: x['value']).aggregateby(lambda x: x, 0, lambda k, a, r: a + int(r), lambda k, a, r: a + r).map(lambda x: (x['key'], x['value'])).sort().run()
env.assertEqual(res, ([('1', 2), ('2', 4)], []))

def test_limit(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '1')
env.cmd('set', 'z', '2')
env.cmd('set', 't', '2')
res = getGB(env).map(lambda x: x['value']).sort().limit(1).run()
env.assertEqual(res, (['1'], []))

def test_sort(env):
env.cmd('set', 'x', '1')
env.cmd('set', 'y', '1')
env.cmd('set', 'z', '2')
env.cmd('set', 't', '2')
res = getGB(env).map(lambda x: x['key']).sort().run()
env.assertEqual(res, (['t', 'x', 'y', 'z'], []))

def test_hashtag(env):
res = getGB(env, 'ShardsIDReader').map(lambda x: hashtag()).run()
env.assertEqual(res, (['06S'], []))

def test_register(env):
res = getGB(env, 'CommandReader').register(trigger='test')
env.assertEqual(res, b'OK')
env.expect('RG.TRIGGER', 'test', 'this', 'is', 'a', 'test').equal([b"['test', 'this', 'is', 'a', 'test']"])