Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ jobs:
run: >
ls -la &&
docker compose -f docker-compose-tests.yaml up --force-recreate --no-deps --wait -d &&
sudo docker exec -w /app/ -i `docker ps | grep python | awk '{print $1;}'` python3 -m pytest -v -s test_mysql_ch_replicator.py
sudo docker exec -w /app/ -i `docker ps | grep python | awk '{print $1;}'` python3 -m pytest -x -v -s test_mysql_ch_replicator.py
33 changes: 33 additions & 0 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ def convert_type(self, mysql_type, parameters):
return 'String'
if 'binary' in mysql_type:
return 'String'
if 'set(' in mysql_type:
return 'String'
raise Exception(f'unknown mysql type "{mysql_type}"')

def convert_field_type(self, mysql_type, mysql_parameters):
Expand Down Expand Up @@ -323,6 +325,21 @@ def convert_record(
charset = mysql_structure.charset_python or 'utf-8'
clickhouse_field_value = clickhouse_field_value.decode(charset)

if 'set(' in mysql_field_type:
set_values = mysql_structure.fields[idx].additional_data
if isinstance(clickhouse_field_value, int):
bit_mask = clickhouse_field_value
clickhouse_field_value = [
val
for idx, val in enumerate(set_values)
if bit_mask & (1 << idx)
]
elif isinstance(clickhouse_field_value, set):
clickhouse_field_value = [
v for v in set_values if v in clickhouse_field_value
]
clickhouse_field_value = ','.join(clickhouse_field_value)

if 'point' in mysql_field_type:
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)

Expand Down Expand Up @@ -651,10 +668,26 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
if len(definition) > 2:
field_parameters = ' '.join(definition[2:])

additional_data = None
if 'set(' in field_type.lower():
vals = field_type[len('set('):]
close_pos = vals.find(')')
vals = vals[:close_pos]
vals = vals.split(',')
def vstrip(e):
if not e:
return e
if e[0] in '"\'':
return e[1:-1]
return e
vals = [vstrip(v) for v in vals]
additional_data = vals

structure.fields.append(TableField(
name=field_name,
field_type=field_type,
parameters=field_parameters,
additional_data=additional_data,
))
#print(' ---- params:', field_parameters)

Expand Down
2 changes: 1 addition & 1 deletion mysql_ch_replicator/pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def __read_values_name(
return None
return ret
self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET
return None
return bit_mask
elif column.type == FIELD_TYPE.BIT:
return self.__read_bit(column)
elif column.type == FIELD_TYPE.GEOMETRY:
Expand Down
11 changes: 11 additions & 0 deletions mysql_ch_replicator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __init__(self, db_name, config_file):


class Runner:

DB_REPLICATOR_RUN_DELAY = 5

def __init__(self, config: Settings, wait_initial_replication: bool, databases: str):
self.config = config
self.databases = databases or config.databases
Expand Down Expand Up @@ -149,8 +152,14 @@ def run(self):
server_thread = threading.Thread(target=self.run_server, daemon=True)
server_thread.start()

t1 = time.time()
while time.time() - t1 < self.DB_REPLICATOR_RUN_DELAY and not killer.kill_now:
time.sleep(0.3)

# First - continue replication for DBs that already finished initial replication
for db in databases:
if killer.kill_now:
break
if not self.is_initial_replication_finished(db_name=db):
continue
logger.info(f'running replication for {db} (initial replication finished)')
Expand All @@ -161,6 +170,8 @@ def run(self):
for db in databases:
if db in self.runners:
continue
if killer.kill_now:
break

logger.info(f'running replication for {db} (initial replication not finished - waiting)')
runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file)
Expand Down
3 changes: 3 additions & 0 deletions mysql_ch_replicator/table_structure.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from dataclasses import dataclass, field
from typing import Any


@dataclass
class TableField:
name: str = ''
field_type: str = ''
parameters: str = ''
additional_data: Any = None

@dataclass
class TableStructure:
Expand Down
19 changes: 12 additions & 7 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def kill_process(pid, force=False):
subprocess.run(command, shell=True)


def assert_wait(condition, max_wait_time=15.0, retry_interval=0.05):
def assert_wait(condition, max_wait_time=20.0, retry_interval=0.05):
max_time = time.time() + max_wait_time
while time.time() < max_time:
if condition():
Expand Down Expand Up @@ -344,9 +344,9 @@ def test_runner():
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Filipp', 50, POINT(10.0, 20.0));", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Xeishfru32', 50, POINT(10.0, 20.0));", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Xeishfru32'")[0]['age'] == 50)

# Test for restarting dead processes
binlog_repl_pid = get_binlog_replicator_pid(cfg)
Expand Down Expand Up @@ -868,13 +868,14 @@ def test_different_types_2():
test1 bit(1),
test2 point,
test3 binary(16),
test4 set('1','2','3','4','5','6','7'),
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3) VALUES "
f"(0, POINT(10.0, 20.0), 'azaza');",
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3, test4) VALUES "
f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5');",
commit=True,
)

Expand All @@ -891,17 +892,21 @@ def test_different_types_2():
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
f"(1, POINT(15.0, 14.0));",
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test4) VALUES "
f"(1, POINT(15.0, 14.0), '2,4,5');",
commit=True,
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test1=True')) == 1)

assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test2']['x'] == 15.0
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test2']['y'] == 20.0
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test3'] == 'azaza\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test4'] == '2,4,5'
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test4'] == '1,3,5'

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
f"(0, NULL);",
Expand Down
Loading