From 4742ecc4a0a39547b1d144337cab86b53bbae584 Mon Sep 17 00:00:00 2001 From: "Meir Shpilraien (Spielrein)" Date: Tue, 26 Apr 2022 12:03:41 +0300 Subject: [PATCH 1/2] added db2 support (#58) Co-authored-by: Chayim --- .github/workflows/tox-tests.yml | 4 +- build/reqpacks/system-setup.py | 2 +- examples/db2/README.md | 26 +++++++++++ examples/db2/example.py | 21 +++++++++ examples/db2/requirements.txt | 1 + pyproject.toml | 5 +- rgsync/Connectors/__init__.py | 23 +++++++++- rgsync/Connectors/sql_connectors.py | 24 ++++++++++ sbin/db2wait.sh | 11 +++++ tests/test_sqlwritebehind.py | 71 ++++++++++++++++++++++------- tox.ini | 42 ++++++++++++++++- 11 files changed, 207 insertions(+), 23 deletions(-) create mode 100644 examples/db2/README.md create mode 100644 examples/db2/example.py create mode 100644 examples/db2/requirements.txt create mode 100644 sbin/db2wait.sh diff --git a/.github/workflows/tox-tests.yml b/.github/workflows/tox-tests.yml index cb26a6b..eeb9436 100644 --- a/.github/workflows/tox-tests.yml +++ b/.github/workflows/tox-tests.yml @@ -6,9 +6,9 @@ jobs: name: Run tests runs-on: ubuntu-latest strategy: - max-parallel: 5 + max-parallel: 15 matrix: - toxenv: [ mongo, mysql, postgres ] + toxenv: [ mongo, mysql, postgres] python-versions: ['3.6', '3.7', '3.8', '3.9', '3.10'] env: ACTIONS_ALLOW_UNSECURE_COMMANDS: true diff --git a/build/reqpacks/system-setup.py b/build/reqpacks/system-setup.py index 33d8684..f0b119a 100755 --- a/build/reqpacks/system-setup.py +++ b/build/reqpacks/system-setup.py @@ -21,7 +21,7 @@ def common_first(self): self.pip_install("wheel virtualenv") self.pip_install("setuptools --upgrade") - self.install("git zip unzip") + self.install("git zip unzip libxml2") self.run("%s/bin/enable-utf8" % READIES) def debian_compat(self): diff --git a/examples/db2/README.md b/examples/db2/README.md new file mode 100644 index 0000000..f3d3a5b --- /dev/null +++ b/examples/db2/README.md @@ -0,0 +1,26 @@ +# Setup DB2 + +# Requirements + +Note, running the db2 recipe, requires some local libraries on your running redis instance - due to the requirements of the *ibm-db-sa* library, used for communicating with db2. Install the equivalent of *libxml2* (libxml2, libxml2-dev, libxml2-devel) on your redis instance, in order to support this recipe. + +# Running the recipe +Assuming you have RedisGears up and running (see [Quick Start](https://oss.redislabs.com/redisgears/quickstart.html)). Please use gears-cli to send a RedisGears Write-Behind and/or Write-Through recipe for execution. For example, run the sample [DB2](example.py) recipe (contains the mapping of DB2 tables with Redis Hashes and RedisGears registrations) and install its dependencies with the following command: + +```bash +gears-cli run --host --port --password example.py --requirements requirements.txt +``` +e.g. +```bash +> gears-cli run --host localhost --port 14000 example.py --requirements requirements.txt +OK +``` + +# Test +Using redis-cli perform: +```bash +redis-cli +> hset emp:1 FirstName foo LastName bar +``` + +Make sure data reached MsSql server: diff --git a/examples/db2/example.py b/examples/db2/example.py new file mode 100644 index 0000000..e453dc9 --- /dev/null +++ b/examples/db2/example.py @@ -0,0 +1,21 @@ +from rgsync.Connectors import DB2Connection, DB2Connector +from rgsync import RGWriteBehind, RGWriteThrough + +''' +Create DB2 connection object +''' +connection = DB2Connection('user', 'pass', 'host[:port]/dbname') + +''' +Create DB2 emp connector +''' +empConnector = DB2Connector(connection, 'emp', 'empno') + +empMappings = { + 'FirstName':'fname', + 'LastName':'lname' +} + +RGWriteBehind(GB, keysPrefix='emp', mappings=empMappings, connector=empConnector, name='empWriteBehind', version='99.99.99') + +RGWriteThrough(GB, keysPrefix='__', mappings=empMappings, connector=empConnector, name='empWriteThrough', version='99.99.99') diff --git a/examples/db2/requirements.txt b/examples/db2/requirements.txt new file mode 100644 index 0000000..8e78173 --- /dev/null +++ b/examples/db2/requirements.txt @@ -0,0 +1 @@ +rgsync diff --git a/pyproject.toml b/pyproject.toml index d8b111b..d7b50d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,15 +33,17 @@ bandit = "^1.7.0" vulture = "^2.3" tox = "^3.24.0" tox-poetry = "^0.4.0" +tox-docker = {git = "https://github.com/chayim/tox-docker/", branch="ck-31-privs"} pytest = "^6.2.0" flake8-docstrings = "^1.6.0" mock = "^4.0.3" black = "^21.7b0" -tox-docker = "^3.0.0" tox-pyenv = "^1.1.0" PyMySQL = "^1.0.2" cryptography = "^3.4.7" psycopg2-binary = "^2.9.1" +ibm-db = "^3.1.1" +ibm-db-sa = "^0.3.7" [tool.pytest.ini_options] markers = [ @@ -49,6 +51,7 @@ markers = [ "postgres: postgres backend tests", "sqlite: sqlite backend tests", "mongo: mongo backend tests", + "db2: db2 backend tests", ] [build-system] diff --git a/rgsync/Connectors/__init__.py b/rgsync/Connectors/__init__.py index 7f9571f..df94762 100644 --- a/rgsync/Connectors/__init__.py +++ b/rgsync/Connectors/__init__.py @@ -1,5 +1,22 @@ from .simple_hash_connector import SimpleHashConnector -from .sql_connectors import MsSqlConnector, MySqlConnector, SQLiteConnection, OracleSqlConnector, SnowflakeSqlConnector, MsSqlConnection, MySqlConnection, OracleSqlConnection, PostgresConnection, PostgresConnector, SnowflakeSqlConnection, SQLiteConnector +from .sql_connectors import ( + BaseSqlConnection, + BaseSqlConnector, + DB2Connection, + DB2Connector, + MsSqlConnection, + MsSqlConnector, + MySqlConnection, + MySqlConnector, + OracleSqlConnection, + OracleSqlConnector, + PostgresConnection, + PostgresConnector, + SQLiteConnection, + SQLiteConnector, + SnowflakeSqlConnection, + SnowflakeSqlConnector, +) from .cql_connector import CqlConnector, CqlConnection from .mongo_connector import MongoConnector, MongoConnection from .redis_connector import RedisConnector, RedisConnection, RedisClusterConnection @@ -7,6 +24,10 @@ __all__ = [ 'SimpleHashConnector', + 'BaseSqlConnection', + 'BaseSqlConnector', + 'DB2Connector', + 'DB2Connection', 'MsSqlConnection', 'MsSqlConnector', 'MySqlConnection', diff --git a/rgsync/Connectors/sql_connectors.py b/rgsync/Connectors/sql_connectors.py index 58da4df..013942c 100644 --- a/rgsync/Connectors/sql_connectors.py +++ b/rgsync/Connectors/sql_connectors.py @@ -99,6 +99,12 @@ def _getConnectionStr(self): account=self.account, db=self.db) +class DB2Connection(BaseSqlConnection): + def __init__(self, user, passwd, db): + BaseSqlConnection.__init__(self, user, passwd, db) + def _getConnectionStr(self): + return 'db2://{user}:{password}@{db}'.format(user=self.user, password=self.passwd, db=self.db) + class BaseSqlConnector(): def __init__(self, connection, tableName, pk, exactlyOnceTableName=None): @@ -307,3 +313,21 @@ def GetUpdateQuery(tableName, mappings, pk): class SnowflakeSqlConnector(OracleSqlConnector): def __init__(self, connection, tableName, pk, exactlyOnceTableName=None): OracleSqlConnector.__init__(self, connection, tableName, pk, exactlyOnceTableName) + +class DB2Connector(BaseSqlConnector): + def __init__(self, connection, tableName, pk, exactlyOnceTableName=None): + BaseSqlConnector.__init__(self, connection, tableName, pk, exactlyOnceTableName) + + def PrepereQueries(self, mappings): + values = [val for kk, val in mappings.items() if not kk.startswith('_')] + values_with_pkey = [self.pk] + values + def GetUpdateQuery(table, pkey, values_with_pkey, values): + merge_into = "MERGE INTO %s d USING (VALUES 1) ON (d.%s = :%s)" % (table, pkey, pkey) + not_matched = "WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s)" % (','.join(values_with_pkey), ','.join([':%s' % a for a in values_with_pkey])) + matched = "WHEN MATCHED THEN UPDATE SET %s" % (','.join(['%s=:%s' % (a,a) for a in values])) + query = "%s %s %s" % (merge_into, not_matched, matched) + return query + self.addQuery = GetUpdateQuery(self.tableName, self.pk, values_with_pkey, values) + self.delQuery = 'delete from %s where %s=:%s' % (self.tableName, self.pk, self.pk) + if self.exactlyOnceTableName is not None: + self.exactlyOnceQuery = GetUpdateQuery(self.exactlyOnceTableName, 'id', ['id', 'val'], ['val']) diff --git a/sbin/db2wait.sh b/sbin/db2wait.sh new file mode 100644 index 0000000..1a7d4ea --- /dev/null +++ b/sbin/db2wait.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +for i in `seq 1 10`; do + docker logs db2|grep "Setup has completed" + if [ $? -eq 0 ]; then + exit 0 + fi + sleep 60 +done +docker logs db2 +exit 1 diff --git a/tests/test_sqlwritebehind.py b/tests/test_sqlwritebehind.py index 8fcfe2d..b80ba16 100644 --- a/tests/test_sqlwritebehind.py +++ b/tests/test_sqlwritebehind.py @@ -28,27 +28,26 @@ def setup_class(cls): e = create_engine(cls.connection(cls, **creds)).execution_options(autocommit=True) cls.dbconn = e.connect() - cls.dbconn.execute(text("DROP TABLE IF EXISTS persons;")) + cls.dbconn.execute(text("DROP TABLE IF EXISTS persons")) cls.dbconn.execute(text(table_create)) @classmethod def teardown_class(cls): - cls.dbconn.execute(text("DROP TABLE IF EXISTS persons;")) + cls.dbconn.execute(text("DROP TABLE IF EXISTS persons")) cls.env.flushall() - + def testSimpleWriteBehind(self): self.env.execute_command('flushall') self.env.execute_command('hset', 'person:1', 'first_name', 'foo', 'last_name', 'bar', 'age', '22') result = self.dbconn.execute(text('select * from persons')) count = 0 - while result.rowcount == 0: - time.sleep(0.1) + while result.rowcount in [0, -1]: + time.sleep(0.3) result = self.dbconn.execute(text('select * from persons')) count += 1 if count == 10: - self.env.assertTrue(False, message='That failed') break - res = result.next() + res = result.first() assert res == ('1', 'foo', 'bar', 22) self.env.execute_command('del', 'person:1') @@ -93,7 +92,7 @@ def testWriteBehindAck(self): assert res[0][1][0][1] == to_utf(['status', 'done']) assert self.env.execute_command('hgetall', 'person:1') == {} result = self.dbconn.execute(text('select * from persons')) - assert result.rowcount == 0 + assert result.rowcount in [0, -1] def testWriteBehindOperations(self): self.env.execute_command('flushall') @@ -108,7 +107,7 @@ def testWriteBehindOperations(self): # make sure data is not in the database result = self.dbconn.execute(text('select * from persons')) - assert result.rowcount == 0 + assert result.rowcount in [0, -1] # rewrite data with replicate self.env.execute_command('hset', 'person:1', 'first_name', 'foo', 'last_name', 'bar', 'age', '22', '#', '=2') @@ -172,7 +171,7 @@ def testSimpleWriteThrough(self): # make sure data is deleted from the database result = self.dbconn.execute(text('select * from persons')) - assert result.rowcount == 0 + assert result.rowcount in [0, -1] assert self.env.execute_command('hgetall', 'person:1') == {} @@ -201,7 +200,7 @@ def testSimpleWriteThroughPartialUpdate(self): # make sure data is deleted from the database result = self.dbconn.execute(text('select * from persons')) - assert result.rowcount == 0 + assert result.rowcount in [0, -1] assert self.env.execute_command('hgetall', 'person:1') == {} @@ -212,7 +211,7 @@ def testWriteThroughNoReplicate(self): # make sure data is deleted from the database result = self.dbconn.execute(text('select * from persons')) - assert result.rowcount == 0 + assert result.rowcount in [0, -1] assert self.env.execute_command('hgetall', 'person:1') == to_utf({'first_name':'foo', 'last_name': 'bar', 'age': '20'}) @@ -241,7 +240,7 @@ def testDelThroughNoReplicate(self): # make sure data was deleted from target as well result = self.dbconn.execute(text('select * from persons')) - assert result.rowcount == 0 + assert result.rowcount in [0, -1] def testWriteTroughAckStream(self): self.env.execute_command('flushall') @@ -266,7 +265,7 @@ def testWriteTroughAckStream(self): # make sure data is deleted from the database result = self.dbconn.execute(text('select * from persons')) - assert result.rowcount == 0 + assert result.rowcount in [0, -1] assert self.env.execute_command('hgetall', 'person:1') == {} @@ -280,7 +279,7 @@ def testWriteTroughAckStreamNoReplicate(self): # make sure data is not in the target result = self.dbconn.execute(text('select * from persons')) - assert result.rowcount == 0 + assert result.rowcount in [0, -1] # make sure data is in redis assert self.env.execute_command('hgetall', 'person:1') == to_utf({'first_name':'foo', 'last_name': 'bar', 'age': '20'}) @@ -295,7 +294,7 @@ def testWriteTroughAckStreamNoReplicate(self): @pytest.mark.postgres class TestPostgresql(BaseSQLTest): - + def credentials(self): r = tox.config.parseconfig(open("tox.ini").read()) docker = r._docker_container_configs["postgres"]["environment"] @@ -337,7 +336,7 @@ def run_install_script(self, pkg, **kwargs): @pytest.mark.mysql class TestMysql(BaseSQLTest): - + def credentials(self): r = tox.config.parseconfig(open("tox.ini").read()) docker = r._docker_container_configs["mysql"]["environment"] @@ -383,3 +382,41 @@ def run_install_script(self, pkg, **kwargs): RGWriteThrough(GB, keysPrefix='__', mappings=personsMappings, connector=personsConnector, name='PersonsWriteThrough', version='99.99.99') """ % (kwargs['dbuser'], kwargs['dbpasswd'], kwargs['db']) self.env.execute_command('RG.PYEXECUTE', script, 'REQUIREMENTS', pkg, 'pymysql[rsa]') + +@pytest.mark.db2 +class TestDB2(BaseSQLTest): + + def credentials(self): + r = tox.config.parseconfig(open("tox.ini").read()) + docker = r._docker_container_configs["db2"]["environment"] + dbuser = docker["DB2INSTANCE"] + dbpasswd = docker["DB2INST1_PASSWORD"] + db = docker["DBNAME"] + + return {"dbuser": dbuser, + "dbpasswd": dbpasswd, + "db": db} + + def connection(self, **kwargs): + + con = f"db2://{kwargs['dbuser']}:{kwargs['dbpasswd']}@172.17.0.1:50000/{kwargs['db']}" + return con + + def run_install_script(self, pkg, **kwargs): + script = """ +from rgsync import RGWriteBehind, RGWriteThrough +from rgsync.Connectors import DB2Connector, DB2Connection + +connection = DB2Connection('%s', '%s', '172.17.0.1:50000/%s') +personsConnector = DB2Connector(connection, 'persons', 'person_id') + +personsMappings = { + 'first_name':'first', + 'last_name':'last', + 'age':'age' +} + +RGWriteBehind(GB, keysPrefix='person', mappings=personsMappings, connector=personsConnector, name='PersonsWriteBehind', version='99.99.99') +RGWriteThrough(GB, keysPrefix='__', mappings=personsMappings, connector=personsConnector, name='PersonsWriteThrough', version='99.99.99') +""" % (kwargs['dbuser'], kwargs['dbpasswd'], kwargs['db']) + self.env.execute_command('RG.PYEXECUTE', script, 'REQUIREMENTS', pkg, 'ibm-db-sa') \ No newline at end of file diff --git a/tox.ini b/tox.ini index 26d762b..f481af5 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,15 @@ +[pytest] +addopts = -s +markers = + mysql: mysql backend tests + postgres: postgres backend tests + sqlite: sqlite backend tests + mongo: mongo backend tests + db2: db2 backend tests + [tox] minversion=3.2.4 -envlist=mysql, postgres, mongo +envlist=mysql, postgres, mongo, db2, linters # for variable reuse [main] @@ -8,6 +17,19 @@ dbuser = admin dbpasswd = adminpass db = rgsync +[docker:db2] +image = ibmcom/db2 +container_name = db2 +privileged = true +ports = + 50000:50000/tcp + 55000:55000/tcp +environment = + LICENSE=accept + DB2INSTANCE={[main]dbuser} + DB2INST1_PASSWORD={[main]dbpasswd} + DBNAME={[main]db} + [docker:mysql] image = mysql:8 healthcheck_cmd = sleep 10 @@ -80,6 +102,13 @@ commands = [testenv:mongo] +[testenv:db2] +commands = + docker exec redisgears apt-get update --fix-missing + docker exec redisgears apt-get install -y gcc vim procps libxml2 + sh {toxinidir}/sbin/db2wait.sh + pytest -m {envname} -s + [testenv:sqlite] allowlist_externals = rm @@ -94,3 +123,14 @@ commands_pre = commands = apt update -y apt install -y sqlite3 + +[testenv:linters] +deps_files = dev_requirements.txt +docker = +commands = + black --target-version py36 --check --diff rgsync tests + isort --check-only --diff rgsync tests + vulture rgsync --min-confidence 80 + flynt --fail-on-change --dry-run . +skipsdist = true +skip_install = true From 7861d519daa40ce27fc580a06bd0a78b02624622 Mon Sep 17 00:00:00 2001 From: Chayim Date: Tue, 26 Apr 2022 13:01:03 +0300 Subject: [PATCH 2/2] Updating dependencies to latest (#111) --- pyproject.toml | 8 ++++---- tests/test_sqlwritebehind.py | 20 ++++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d7b50d2..8bff992 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,9 +23,9 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.6.2" -redis = "^4.1.4" -SQLAlchemy = "1.3.24" -pymongo = "4.0.1" # located here, because it achieves the same goal as sqlalchemy +redis = "^4.2.0" +SQLAlchemy = "1.4.35" +pymongo = "4.1.1" # located here, because it achieves the same goal as sqlalchemy [tool.poetry.dev-dependencies] flake8 = "^3.9.2" @@ -41,7 +41,7 @@ black = "^21.7b0" tox-pyenv = "^1.1.0" PyMySQL = "^1.0.2" cryptography = "^3.4.7" -psycopg2-binary = "^2.9.1" +psycopg2-binary = "^2.9.3" ibm-db = "^3.1.1" ibm-db-sa = "^0.3.7" diff --git a/tests/test_sqlwritebehind.py b/tests/test_sqlwritebehind.py index b80ba16..5e624e6 100644 --- a/tests/test_sqlwritebehind.py +++ b/tests/test_sqlwritebehind.py @@ -47,7 +47,7 @@ def testSimpleWriteBehind(self): count += 1 if count == 10: break - res = result.first() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 22) self.env.execute_command('del', 'person:1') @@ -75,7 +75,7 @@ def testWriteBehindAck(self): assert res[0][1][0][1] == to_utf(['status', 'done']) result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 22) @@ -122,7 +122,7 @@ def testWriteBehindOperations(self): assert res[0][1][0][1] == to_utf(['status', 'done']) result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 22) # delete data without replicate @@ -135,7 +135,7 @@ def testWriteBehindOperations(self): # make sure data is still in the dabase result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 22) # rewrite a hash and not replicate @@ -162,7 +162,7 @@ def testSimpleWriteThrough(self): # make sure data is in the dabase result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 20) assert self.env.execute_command('hgetall', 'person:1') == to_utf({'first_name':'foo', 'last_name': 'bar', 'age': '20'}) @@ -182,7 +182,7 @@ def testSimpleWriteThroughPartialUpdate(self): # make sure data is in the dabase result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 20) assert self.env.execute_command('hgetall', 'person:1') == to_utf({'first_name':'foo', 'last_name': 'bar', 'age': '20'}) @@ -191,7 +191,7 @@ def testSimpleWriteThroughPartialUpdate(self): # make sure data is in the dabase result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo1', 'bar', 20) assert self.env.execute_command('hgetall', 'person:1') == to_utf({'first_name':'foo1', 'last_name': 'bar', 'age': '20'}) @@ -222,7 +222,7 @@ def testDelThroughNoReplicate(self): # make sure data is in the dabase result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 20) assert self.env.execute_command('hgetall', 'person:1') == to_utf({'first_name':'foo', 'last_name': 'bar', 'age': '20'}) @@ -232,7 +232,7 @@ def testDelThroughNoReplicate(self): # make sure data was deleted from redis but not from the target assert self.env.execute_command('hgetall', 'person:1') == {} result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 20) @@ -252,7 +252,7 @@ def testWriteTroughAckStream(self): # make sure data is in the dabase result = self.dbconn.execute(text('select * from persons')) - res = result.next() + res = result.fetchone() assert res == ('1', 'foo', 'bar', 20) # make sure data is in redis