From cd25b56b9ba736ba10e4df058251d7dd28fe9a8f Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Thu, 2 Jan 2025 01:09:44 +0400 Subject: [PATCH 1/5] Support for mysql set type --- mysql_ch_replicator/converter.py | 33 +++++++++++++++++++ .../pymysqlreplication/row_event.py | 2 +- mysql_ch_replicator/table_structure.py | 3 ++ test_mysql_ch_replicator.py | 15 ++++++--- 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index c7ff0d1..076d8c8 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -244,6 +244,8 @@ def convert_type(self, mysql_type, parameters): return 'String' if 'binary' in mysql_type: return 'String' + if 'set(' in mysql_type: + return 'String' raise Exception(f'unknown mysql type "{mysql_type}"') def convert_field_type(self, mysql_type, mysql_parameters): @@ -323,6 +325,21 @@ def convert_record( charset = mysql_structure.charset_python or 'utf-8' clickhouse_field_value = clickhouse_field_value.decode(charset) + if 'set(' in mysql_field_type: + set_values = mysql_structure.fields[idx].additional_data + if isinstance(clickhouse_field_value, int): + bit_mask = clickhouse_field_value + clickhouse_field_value = [ + val + for idx, val in enumerate(set_values) + if bit_mask & (1 << idx) + ] + elif isinstance(clickhouse_field_value, set): + clickhouse_field_value = [ + v for v in set_values if v in clickhouse_field_value + ] + clickhouse_field_value = ','.join(clickhouse_field_value) + if 'point' in mysql_field_type: clickhouse_field_value = parse_mysql_point(clickhouse_field_value) @@ -651,10 +668,26 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None if len(definition) > 2: field_parameters = ' '.join(definition[2:]) + additional_data = None + if 'set(' in field_type.lower(): + vals = field_type[len('set('):] + close_pos = vals.find(')') + vals = vals[:close_pos] + vals = vals.split(',') + def vstrip(e): + if not e: + return e + if e[0] in '"\'': + return e[1:-1] + return e + vals = [vstrip(v) for v in vals] + additional_data = vals + structure.fields.append(TableField( name=field_name, field_type=field_type, parameters=field_parameters, + additional_data=additional_data, )) #print(' ---- params:', field_parameters) diff --git a/mysql_ch_replicator/pymysqlreplication/row_event.py b/mysql_ch_replicator/pymysqlreplication/row_event.py index a4dc452..0351116 100644 --- a/mysql_ch_replicator/pymysqlreplication/row_event.py +++ b/mysql_ch_replicator/pymysqlreplication/row_event.py @@ -275,7 +275,7 @@ def __read_values_name( return None return ret self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET - return None + return bit_mask elif column.type == FIELD_TYPE.BIT: return self.__read_bit(column) elif column.type == FIELD_TYPE.GEOMETRY: diff --git a/mysql_ch_replicator/table_structure.py b/mysql_ch_replicator/table_structure.py index d2e9cf4..3ffdce9 100644 --- a/mysql_ch_replicator/table_structure.py +++ b/mysql_ch_replicator/table_structure.py @@ -1,10 +1,13 @@ from dataclasses import dataclass, field +from typing import Any + @dataclass class TableField: name: str = '' field_type: str = '' parameters: str = '' + additional_data: Any = None @dataclass class TableStructure: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 5de9b23..2a47fc0 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -50,7 +50,7 @@ def kill_process(pid, force=False): subprocess.run(command, shell=True) -def assert_wait(condition, max_wait_time=15.0, retry_interval=0.05): +def assert_wait(condition, max_wait_time=20.0, retry_interval=0.05): max_time = time.time() + max_wait_time while time.time() < max_time: if condition(): @@ -868,13 +868,14 @@ def test_different_types_2(): test1 bit(1), test2 point, test3 binary(16), + test4 set('1','2','3','4','5','6','7'), PRIMARY KEY (id) ); ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3) VALUES " - f"(0, POINT(10.0, 20.0), 'azaza');", + f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3, test4) VALUES " + f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5');", commit=True, ) @@ -891,10 +892,11 @@ def test_different_types_2(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES " - f"(1, POINT(15.0, 14.0));", + f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test4) VALUES " + f"(1, POINT(15.0, 14.0), '2,4,5');", commit=True, ) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test1=True')) == 1) @@ -902,6 +904,9 @@ def test_different_types_2(): assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test2']['y'] == 20.0 assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test3'] == 'azaza\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test4'] == '2,4,5' + assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test4'] == '1,3,5' + mysql.execute( f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES " f"(0, NULL);", From aa3296f547c172732dc0032e14eed8d0a42b20d6 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Thu, 2 Jan 2025 01:24:23 +0400 Subject: [PATCH 2/5] Interrupt on first failed test --- .github/workflows/tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index fb2d30c..9fb7c22 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -17,4 +17,4 @@ jobs: run: > ls -la && docker compose -f docker-compose-tests.yaml up --force-recreate --no-deps --wait -d && - sudo docker exec -w /app/ -i `docker ps | grep python | awk '{print $1;}'` python3 -m pytest -v -s test_mysql_ch_replicator.py + sudo docker exec -w /app/ -i `docker ps | grep python | awk '{print $1;}'` python3 -m pytest -x -v -s test_mysql_ch_replicator.py From dcec097b03bc15f99656fde472d6fae8f07862df Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Thu, 2 Jan 2025 01:38:16 +0400 Subject: [PATCH 3/5] Updated value to insert --- test_mysql_ch_replicator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 2a47fc0..ac64f99 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -344,9 +344,9 @@ def test_runner(): assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Filipp', 50, POINT(10.0, 20.0));", commit=True) + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Xeishfru32', 50, POINT(10.0, 20.0));", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) - assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Xeishfru32'")[0]['age'] == 50) # Test for restarting dead processes binlog_repl_pid = get_binlog_replicator_pid(cfg) From 6d117c0de844a2c8e5edf3c1d6ea43d602cc6ef9 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Thu, 2 Jan 2025 01:50:02 +0400 Subject: [PATCH 4/5] Investigating tests fix --- test_mysql_ch_replicator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index ac64f99..5ecb1ac 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -344,6 +344,8 @@ def test_runner(): assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) + time.sleep(5) + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Xeishfru32', 50, POINT(10.0, 20.0));", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Xeishfru32'")[0]['age'] == 50) From a040a59083ee23a6a7802041ab82aefa27716a32 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Thu, 2 Jan 2025 02:04:06 +0400 Subject: [PATCH 5/5] fix tests more correctly --- mysql_ch_replicator/runner.py | 11 +++++++++++ test_mysql_ch_replicator.py | 2 -- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/mysql_ch_replicator/runner.py b/mysql_ch_replicator/runner.py index 19f0e72..553db03 100644 --- a/mysql_ch_replicator/runner.py +++ b/mysql_ch_replicator/runner.py @@ -43,6 +43,9 @@ def __init__(self, db_name, config_file): class Runner: + + DB_REPLICATOR_RUN_DELAY = 5 + def __init__(self, config: Settings, wait_initial_replication: bool, databases: str): self.config = config self.databases = databases or config.databases @@ -149,8 +152,14 @@ def run(self): server_thread = threading.Thread(target=self.run_server, daemon=True) server_thread.start() + t1 = time.time() + while time.time() - t1 < self.DB_REPLICATOR_RUN_DELAY and not killer.kill_now: + time.sleep(0.3) + # First - continue replication for DBs that already finished initial replication for db in databases: + if killer.kill_now: + break if not self.is_initial_replication_finished(db_name=db): continue logger.info(f'running replication for {db} (initial replication finished)') @@ -161,6 +170,8 @@ def run(self): for db in databases: if db in self.runners: continue + if killer.kill_now: + break logger.info(f'running replication for {db} (initial replication not finished - waiting)') runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 5ecb1ac..ac64f99 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -344,8 +344,6 @@ def test_runner(): assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) - time.sleep(5) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Xeishfru32', 50, POINT(10.0, 20.0));", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Xeishfru32'")[0]['age'] == 50)