From 71c48538fb4468171baa901c1d6b9bec809d61db Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 29 Mar 2025 14:58:21 +0400 Subject: [PATCH 1/6] Correct test name --- test_mysql_ch_replicator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 9136ac0..443be40 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1357,7 +1357,7 @@ def get_last_insert_from_binlog(cfg: config.Settings, db_name: str): @pytest.mark.optional -def test_performance_dbreplicator(): +def test_performance_binlog_replicator(): config_file = 'tests_config_perf.yaml' num_records = 100000 From 8d603abd452aa127afb0ff54cbfb1880e8539a30 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 29 Mar 2025 15:15:49 +0400 Subject: [PATCH 2/6] Binlog and DB replicator perftests --- test_mysql_ch_replicator.py | 40 +++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 443be40..ed5bb75 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1357,7 +1357,7 @@ def get_last_insert_from_binlog(cfg: config.Settings, db_name: str): @pytest.mark.optional -def test_performance_binlog_replicator(): +def test_performance_realtime_replication(): config_file = 'tests_config_perf.yaml' num_records = 100000 @@ -1387,6 +1387,8 @@ def test_performance_binlog_replicator(): binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) + db_replicator_runner.run() time.sleep(1) @@ -1399,8 +1401,15 @@ def _get_last_insert_name(): return record[1].decode('utf-8') assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5) + + # Wait for the database and table to be created in ClickHouse + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, retry_interval=0.5) binlog_replicator_runner.stop() + db_replicator_runner.stop() time.sleep(1) @@ -1418,7 +1427,7 @@ def _get_last_insert_name(): mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True) - print("running db_replicator") + print("running binlog_replicator") t1 = time.time() binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) binlog_replicator_runner.run() @@ -1433,6 +1442,33 @@ def _get_last_insert_name(): print('\n\n') print("*****************************") + print("Binlog Replicator Performance:") + print("records per second:", int(rps)) + print("total time (seconds):", round(time_delta, 2)) + print("*****************************") + print('\n\n') + + # Now test db_replicator performance + print("running db_replicator") + t1 = time.time() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) + db_replicator_runner.run() + + # Make sure the database and table exist before querying + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 2, retry_interval=0.5, max_wait_time=1000) + t2 = time.time() + + db_replicator_runner.stop() + + time_delta = t2 - t1 + rps = num_records / time_delta + + print('\n\n') + print("*****************************") + print("DB Replicator Performance:") print("records per second:", int(rps)) print("total time (seconds):", round(time_delta, 2)) print("*****************************") From 9afe62ee58b65cdd2fa2edca42a6f54ddd639b96 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 29 Mar 2025 15:40:38 +0400 Subject: [PATCH 3/6] Initial replication perftest --- test_mysql_ch_replicator.py | 79 +++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index ed5bb75..7c87cae 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1997,3 +1997,82 @@ def test_year_type(): run_all_runner.stop() assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) assert('Traceback' not in read_logs(TEST_DB_NAME)) + +@pytest.mark.optional +def test_performance_initial_only_replication(): + config_file = 'tests_config_perf.yaml' + num_records = 1000000 + + cfg = config.Settings() + cfg.load(config_file) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id int NOT NULL AUTO_INCREMENT, + name varchar(2048), + age int, + PRIMARY KEY (id) + ); + ''') + + print("populating mysql data") + + base_value = 'a' * 2000 + + for i in range(num_records): + if i % 2000 == 0: + print(f'populated {i} elements') + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) " + f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0, + ) + + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True) + print(f"finished populating {num_records} records") + + # Now test db_replicator performance in initial_only mode + print("running db_replicator in initial_only mode") + t1 = time.time() + + db_replicator_runner = DbReplicatorRunner( + TEST_DB_NAME, + additional_arguments='--initial_only=True', + cfg_file=config_file + ) + db_replicator_runner.run() + db_replicator_runner.wait_complete() # Wait for the process to complete + + # Make sure the database and table exist + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5) + + # Check that all records were replicated + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 1, retry_interval=0.5, max_wait_time=300) + + t2 = time.time() + + time_delta = t2 - t1 + rps = num_records / time_delta + + print('\n\n') + print("*****************************") + print("DB Replicator Initial Only Mode Performance:") + print("records per second:", int(rps)) + print("total time (seconds):", round(time_delta, 2)) + print("*****************************") + print('\n\n') + + db_replicator_runner.stop() From 2952e643a2acf0e2bbaddc6e5967cb46d924f6f9 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sun, 30 Mar 2025 00:24:00 +0400 Subject: [PATCH 4/6] Parallel initial replication --- mysql_ch_replicator/config.py | 6 ++ mysql_ch_replicator/db_replicator.py | 150 +++++++++++++++++++++++---- mysql_ch_replicator/main.py | 29 +++++- mysql_ch_replicator/mysql_api.py | 21 +++- mysql_ch_replicator/runner.py | 15 ++- test_mysql_ch_replicator.py | 67 +++++++++++- tests_config_parallel.yaml | 37 +++++++ 7 files changed, 293 insertions(+), 32 deletions(-) create mode 100644 tests_config_parallel.yaml diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index d428fe9..57c7b3c 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -119,6 +119,7 @@ def __init__(self): self.http_port = 0 self.types_mapping = {} self.target_databases = {} + self.initial_replication_threads = 0 def load(self, settings_file): data = open(settings_file, 'r').read() @@ -143,6 +144,7 @@ def load(self, settings_file): self.http_host = data.pop('http_host', '') self.http_port = data.pop('http_port', 0) self.target_databases = data.pop('target_databases', {}) + self.initial_replication_threads = data.pop('initial_replication_threads', 0) indexes = data.pop('indexes', []) for index in indexes: @@ -202,3 +204,7 @@ def validate(self): self.validate_log_level() if not isinstance(self.target_databases, dict): raise ValueError(f'wrong target databases {self.target_databases}') + if not isinstance(self.initial_replication_threads, int): + raise ValueError(f'initial_replication_threads should be an integer, not {type(self.initial_replication_threads)}') + if self.initial_replication_threads < 0: + raise ValueError(f'initial_replication_threads should be non-negative') diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 87fd94d..6d9742e 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -1,11 +1,15 @@ import json import os.path +import random import time import pickle from logging import getLogger from enum import Enum from dataclasses import dataclass from collections import defaultdict +import sys +import subprocess +import select from .config import Settings, MysqlSettings, ClickhouseSettings from .mysql_api import MySQLApi @@ -106,10 +110,15 @@ class DbReplicator: READ_LOG_INTERVAL = 0.3 - def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False): + def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False, + worker_id: int = None, total_workers: int = None, table: str = None): self.config = config self.database = database - + self.worker_id = worker_id + self.total_workers = total_workers + self.settings_file = config.settings_file + self.single_table = table # Store the single table to process + # use same as source database by default self.target_database = database @@ -122,9 +131,29 @@ def __init__(self, config: Settings, database: str, target_database: str = None, if target_database: self.target_database = target_database - self.target_database_tmp = self.target_database + '_tmp' self.initial_only = initial_only + # Handle state file differently for parallel workers + if self.worker_id is not None and self.total_workers is not None: + # For worker processes in parallel mode, use a different state file + self.is_parallel_worker = True + self.state_path = os.path.join( + self.config.binlog_replicator.data_dir, + self.database, + f'state_worker_{self.worker_id}_{random.randint(0,9999999999)}.pckl' + ) + logger.info(f"Worker {self.worker_id}/{self.total_workers} using state file: {self.state_path}") + + if self.single_table: + logger.info(f"Worker {self.worker_id} focusing only on table: {self.single_table}") + else: + self.state_path = os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl') + self.is_parallel_worker = False + + self.target_database_tmp = self.target_database + '_tmp' + if self.is_parallel_worker: + self.target_database_tmp = self.target_database + self.mysql_api = MySQLApi( database=self.database, mysql_settings=config.mysql, @@ -148,7 +177,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None, self.start_time = time.time() def create_state(self): - return State(os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl')) + return State(self.state_path) def validate_database_settings(self): if not self.initial_only: @@ -196,7 +225,9 @@ def run(self): logger.info('recreating database') self.clickhouse_api.database = self.target_database_tmp - self.clickhouse_api.recreate_database() + if not self.is_parallel_worker: + self.clickhouse_api.recreate_database() + self.state.tables = self.mysql_api.get_tables() self.state.tables = [ table for table in self.state.tables if self.config.is_table_matches(table) @@ -220,6 +251,10 @@ def create_initial_structure(self): def create_initial_structure_table(self, table_name): if not self.config.is_table_matches(table_name): return + + if self.single_table and self.single_table != table_name: + return + mysql_create_statement = self.mysql_api.get_table_create_statement(table_name) mysql_structure = self.converter.parse_mysql_table_structure( mysql_create_statement, required_table_name=table_name, @@ -232,7 +267,9 @@ def create_initial_structure_table(self, table_name): self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure) indexes = self.config.get_indexes(self.database, table_name) - self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes) + + if not self.is_parallel_worker: + self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes) def prevent_binlog_removal(self): if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL: @@ -253,22 +290,26 @@ def perform_initial_replication(self): for table in self.state.tables: if start_table and table != start_table: continue + if self.single_table and self.single_table != table: + continue self.perform_initial_replication_table(table) start_table = None - logger.info(f'initial replication - swapping database') - if self.target_database in self.clickhouse_api.get_databases(): - self.clickhouse_api.execute_command( - f'RENAME DATABASE `{self.target_database}` TO `{self.target_database}_old`', - ) - self.clickhouse_api.execute_command( - f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`', - ) - self.clickhouse_api.drop_database(f'{self.target_database}_old') - else: - self.clickhouse_api.execute_command( - f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`', - ) - self.clickhouse_api.database = self.target_database + + if not self.is_parallel_worker: + logger.info(f'initial replication - swapping database') + if self.target_database in self.clickhouse_api.get_databases(): + self.clickhouse_api.execute_command( + f'RENAME DATABASE `{self.target_database}` TO `{self.target_database}_old`', + ) + self.clickhouse_api.execute_command( + f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`', + ) + self.clickhouse_api.drop_database(f'{self.target_database}_old') + else: + self.clickhouse_api.execute_command( + f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`', + ) + self.clickhouse_api.database = self.target_database logger.info(f'initial replication - done') def perform_initial_replication_table(self, table_name): @@ -278,6 +319,13 @@ def perform_initial_replication_table(self, table_name): logger.info(f'skip table {table_name} - not matching any allowed table') return + if not self.is_parallel_worker and self.config.initial_replication_threads > 1: + self.state.initial_replication_table = table_name + self.state.initial_replication_max_primary_key = None + self.state.save() + self.perform_initial_replication_table_parallel(table_name) + return + max_primary_key = None if self.state.initial_replication_table == table_name: # continue replication from saved position @@ -322,6 +370,8 @@ def perform_initial_replication_table(self, table_name): order_by=primary_keys, limit=DbReplicator.INITIAL_REPLICATION_BATCH_SIZE, start_value=query_start_values, + worker_id=self.worker_id, + total_workers=self.total_workers, ) logger.debug(f'extracted {len(records)} records from mysql') @@ -360,6 +410,66 @@ def perform_initial_replication_table(self, table_name): f'primary key: {max_primary_key}', ) + def perform_initial_replication_table_parallel(self, table_name): + """ + Execute initial replication for a table using multiple parallel worker processes. + Each worker will handle a portion of the table based on its worker_id and total_workers. + """ + logger.info(f"Starting parallel replication for table {table_name} with {self.config.initial_replication_threads} workers") + + # Create and launch worker processes + processes = [] + for worker_id in range(self.config.initial_replication_threads): + # Prepare command to launch a worker process + cmd = [ + sys.executable, "-m", "mysql_ch_replicator.main", + "db_replicator", # Required positional mode argument + "--config", self.settings_file, + "--db", self.database, + "--worker_id", str(worker_id), + "--total_workers", str(self.config.initial_replication_threads), + "--table", table_name, + "--target_db", self.target_database_tmp, + "--initial_only=True", + ] + + logger.info(f"Launching worker {worker_id}: {' '.join(cmd)}") + process = subprocess.Popen(cmd) + processes.append(process) + + # Wait for all worker processes to complete + logger.info(f"Waiting for {len(processes)} workers to complete replication of {table_name}") + + try: + while processes: + for i, process in enumerate(processes[:]): + # Check if process is still running + if process.poll() is not None: + exit_code = process.returncode + if exit_code == 0: + logger.info(f"Worker process {i} completed successfully") + else: + logger.error(f"Worker process {i} failed with exit code {exit_code}") + # Optional: can raise an exception here to abort the entire operation + raise Exception(f"Worker process failed with exit code {exit_code}") + + processes.remove(process) + + if processes: + # Wait a bit before checking again + time.sleep(0.1) + + # Every 30 seconds, log progress + if int(time.time()) % 30 == 0: + logger.info(f"Still waiting for {len(processes)} workers to complete") + except KeyboardInterrupt: + logger.warning("Received interrupt, terminating worker processes") + for process in processes: + process.terminate() + raise + + logger.info(f"All workers completed replication of table {table_name}") + def run_realtime_replication(self): if self.initial_only: logger.info('skip running realtime replication, only initial replication was requested') diff --git a/mysql_ch_replicator/main.py b/mysql_ch_replicator/main.py index 27c9031..f04c23e 100755 --- a/mysql_ch_replicator/main.py +++ b/mysql_ch_replicator/main.py @@ -87,13 +87,28 @@ def run_db_replicator(args, config: Settings): 'db_replicator.log', ) - set_logging_config(f'dbrepl {args.db}', log_file=log_file, log_level_str=config.log_level) + # Set log tag according to whether this is a worker or main process + if args.worker_id is not None: + if args.table: + log_tag = f'dbrepl {db_name} worker_{args.worker_id} table_{args.table}' + else: + log_tag = f'dbrepl {db_name} worker_{args.worker_id}' + else: + log_tag = f'dbrepl {db_name}' + + set_logging_config(log_tag, log_file=log_file, log_level_str=config.log_level) + + if args.table: + logging.info(f"Processing specific table: {args.table}") db_replicator = DbReplicator( config=config, database=db_name, target_database=getattr(args, 'target_db', None), initial_only=args.initial_only, + worker_id=args.worker_id, + total_workers=args.total_workers, + table=args.table, ) db_replicator.run() @@ -142,6 +157,18 @@ def main(): "--initial_only", type=bool, default=False, help="don't run realtime replication, run initial replication only", ) + parser.add_argument( + "--worker_id", type=int, default=None, + help="Worker ID for parallel initial replication (0-based)", + ) + parser.add_argument( + "--total_workers", type=int, default=None, + help="Total number of workers for parallel initial replication", + ) + parser.add_argument( + "--table", type=str, default=None, + help="Specific table to process (used with --worker_id for parallel processing of a single table)", + ) args = parser.parse_args() config = Settings() diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index b8b25c3..6669f95 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -93,14 +93,27 @@ def get_table_create_statement(self, table_name) -> str: create_statement = res[0][1].strip() return create_statement - def get_records(self, table_name, order_by, limit, start_value=None): + def get_records(self, table_name, order_by, limit, start_value=None, worker_id=None, total_workers=None): self.reconnect_if_required() - order_by = ','.join(order_by) + order_by_str = ','.join(order_by) where = '' if start_value is not None: start_value = ','.join(map(str, start_value)) - where = f'WHERE ({order_by}) > ({start_value}) ' - query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by} LIMIT {limit}' + where = f'WHERE ({order_by_str}) > ({start_value}) ' + + # Add partitioning filter for parallel processing if needed + if worker_id is not None and total_workers is not None and total_workers > 1: + concat_keys = f"CONCAT_WS('|', {', '.join([f'COALESCE({key}, \"\")' for key in order_by])})" + hash_condition = f"CRC32({concat_keys}) % {total_workers} = {worker_id}" + if where: + where += f'AND {hash_condition} ' + else: + where = f'WHERE {hash_condition} ' + + query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}' + print("query:", query) + + # Execute the actual query self.cursor.execute(query) res = self.cursor.fetchall() records = [x for x in res] diff --git a/mysql_ch_replicator/runner.py b/mysql_ch_replicator/runner.py index 8f131d7..2c3af0a 100644 --- a/mysql_ch_replicator/runner.py +++ b/mysql_ch_replicator/runner.py @@ -24,8 +24,19 @@ def __init__(self, config_file): class DbReplicatorRunner(ProcessRunner): - def __init__(self, db_name, config_file): - super().__init__(f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator') + def __init__(self, db_name, config_file, worker_id=None, total_workers=None, initial_only=False): + cmd = f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator' + + if worker_id is not None: + cmd += f' --worker_id={worker_id}' + + if total_workers is not None: + cmd += f' --total_workers={total_workers}' + + if initial_only: + cmd += ' --initial_only=True' + + super().__init__(cmd) class DbOptimizerRunner(ProcessRunner): diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 7c87cae..b0cb989 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -316,9 +316,10 @@ def get_db_replicator_pid(cfg: config.Settings, db_name: str): return state.pid -def test_runner(): +@pytest.mark.parametrize('cfg_file', [CONFIG_FILE, 'tests_config_parallel.yaml']) +def test_runner(cfg_file): cfg = config.Settings() - cfg.load(CONFIG_FILE) + cfg.load(cfg_file) mysql = mysql_api.MySQLApi( database=None, @@ -367,7 +368,7 @@ def test_runner(): mysql.execute(f"INSERT INTO `group` (name, age, rate) VALUES ('Peter', 33, 10.2);", commit=True) - run_all_runner = RunAllRunner() + run_all_runner = RunAllRunner(cfg_file=cfg_file) run_all_runner.run() assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) @@ -422,6 +423,8 @@ def test_runner(): commit=True, ) + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5) assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo') @@ -431,6 +434,8 @@ def test_runner(): requests.get('http://localhost:9128/restart_replication') time.sleep(1.0) + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5) assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo') @@ -2001,7 +2006,7 @@ def test_year_type(): @pytest.mark.optional def test_performance_initial_only_replication(): config_file = 'tests_config_perf.yaml' - num_records = 1000000 + num_records = 300000 cfg = config.Settings() cfg.load(config_file) @@ -2074,5 +2079,57 @@ def test_performance_initial_only_replication(): print("total time (seconds):", round(time_delta, 2)) print("*****************************") print('\n\n') - + + # Clean up + ch.drop_database(TEST_DB_NAME) + + # Now test with parallel replication + # Set initial_replication_threads in the config + print("running db_replicator with parallel initial replication") + + t1 = time.time() + + # Create a custom config file for testing with parallel replication + parallel_config_file = 'tests_config_perf_parallel.yaml' + if os.path.exists(parallel_config_file): + os.remove(parallel_config_file) + + with open(config_file, 'r') as src_file: + config_content = src_file.read() + config_content += f"\ninitial_replication_threads: 8\n" + with open(parallel_config_file, 'w') as dest_file: + dest_file.write(config_content) + + # Use the DbReplicator directly to test the new parallel implementation + db_replicator_runner = DbReplicatorRunner( + TEST_DB_NAME, + cfg_file=parallel_config_file + ) + db_replicator_runner.run() + + # Make sure the database and table exist + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5) + + # Check that all records were replicated + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 1, retry_interval=0.5, max_wait_time=300) + + t2 = time.time() + + time_delta = t2 - t1 + rps = num_records / time_delta + + print('\n\n') + print("*****************************") + print("DB Replicator Parallel Mode Performance:") + print("workers:", cfg.initial_replication_threads) + print("records per second:", int(rps)) + print("total time (seconds):", round(time_delta, 2)) + print("*****************************") + print('\n\n') + db_replicator_runner.stop() + + # Clean up the temporary config file + os.remove(parallel_config_file) diff --git a/tests_config_parallel.yaml b/tests_config_parallel.yaml new file mode 100644 index 0000000..1f6803d --- /dev/null +++ b/tests_config_parallel.yaml @@ -0,0 +1,37 @@ +mysql: + host: 'localhost' + port: 9306 + user: 'root' + password: 'admin' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + binlog_retention_period: 43200 # 12 hours in seconds + +databases: '*test*' +log_level: 'debug' +optimize_interval: 3 +check_db_updated_interval: 3 + +target_databases: + replication-test_db_2: replication-destination + +indexes: + - databases: '*' + tables: ['group'] + index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' + +http_host: 'localhost' +http_port: 9128 + +types_mapping: + 'char(36)': 'UUID' + +initial_replication_threads: 4 From c87fc6c9600fdbf2b2abe35c44f743e0fb47b9d2 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Tue, 1 Apr 2025 23:41:21 +0400 Subject: [PATCH 5/6] Fix key filtering for workers --- mysql_ch_replicator/mysql_api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index 6669f95..082eb78 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -103,7 +103,9 @@ def get_records(self, table_name, order_by, limit, start_value=None, worker_id=N # Add partitioning filter for parallel processing if needed if worker_id is not None and total_workers is not None and total_workers > 1: - concat_keys = f"CONCAT_WS('|', {', '.join([f'COALESCE({key}, \"\")' for key in order_by])})" + # Use a list comprehension to build the COALESCE expressions with proper quoting + coalesce_expressions = [f"COALESCE({key}, '')" for key in order_by] + concat_keys = f"CONCAT_WS('|', {', '.join(coalesce_expressions)})" hash_condition = f"CRC32({concat_keys}) % {total_workers} = {worker_id}" if where: where += f'AND {hash_condition} ' From ae280d6536da4dbeafd70a4cb94fb538678e5207 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Tue, 1 Apr 2025 23:54:48 +0400 Subject: [PATCH 6/6] Determenistic state file --- mysql_ch_replicator/db_replicator.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 6d9742e..56a66f9 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -3,6 +3,7 @@ import random import time import pickle +import hashlib from logging import getLogger from enum import Enum from dataclasses import dataclass @@ -135,13 +136,26 @@ def __init__(self, config: Settings, database: str, target_database: str = None, # Handle state file differently for parallel workers if self.worker_id is not None and self.total_workers is not None: - # For worker processes in parallel mode, use a different state file + # For worker processes in parallel mode, use a different state file with a deterministic name self.is_parallel_worker = True + + # Determine table name for the state file + table_identifier = self.single_table if self.single_table else "all_tables" + + # Create a hash of the table name to ensure it's filesystem-safe + if self.single_table: + # Use a hex digest of the table name to ensure it's filesystem-safe + table_identifier = hashlib.sha256(self.single_table.encode('utf-8')).hexdigest()[:16] + else: + table_identifier = "all_tables" + + # Create a deterministic state file path that includes worker_id, total_workers, and table hash self.state_path = os.path.join( self.config.binlog_replicator.data_dir, self.database, - f'state_worker_{self.worker_id}_{random.randint(0,9999999999)}.pckl' + f'state_worker_{self.worker_id}_of_{self.total_workers}_{table_identifier}.pckl' ) + logger.info(f"Worker {self.worker_id}/{self.total_workers} using state file: {self.state_path}") if self.single_table: