From 71c48538fb4468171baa901c1d6b9bec809d61db Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 29 Mar 2025 14:58:21 +0400 Subject: [PATCH 1/3] Correct test name --- test_mysql_ch_replicator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 9136ac0..443be40 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1357,7 +1357,7 @@ def get_last_insert_from_binlog(cfg: config.Settings, db_name: str): @pytest.mark.optional -def test_performance_dbreplicator(): +def test_performance_binlog_replicator(): config_file = 'tests_config_perf.yaml' num_records = 100000 From 8d603abd452aa127afb0ff54cbfb1880e8539a30 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 29 Mar 2025 15:15:49 +0400 Subject: [PATCH 2/3] Binlog and DB replicator perftests --- test_mysql_ch_replicator.py | 40 +++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 443be40..ed5bb75 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1357,7 +1357,7 @@ def get_last_insert_from_binlog(cfg: config.Settings, db_name: str): @pytest.mark.optional -def test_performance_binlog_replicator(): +def test_performance_realtime_replication(): config_file = 'tests_config_perf.yaml' num_records = 100000 @@ -1387,6 +1387,8 @@ def test_performance_binlog_replicator(): binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) + db_replicator_runner.run() time.sleep(1) @@ -1399,8 +1401,15 @@ def _get_last_insert_name(): return record[1].decode('utf-8') assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5) + + # Wait for the database and table to be created in ClickHouse + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, retry_interval=0.5) binlog_replicator_runner.stop() + db_replicator_runner.stop() time.sleep(1) @@ -1418,7 +1427,7 @@ def _get_last_insert_name(): mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True) - print("running db_replicator") + print("running binlog_replicator") t1 = time.time() binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) binlog_replicator_runner.run() @@ -1433,6 +1442,33 @@ def _get_last_insert_name(): print('\n\n') print("*****************************") + print("Binlog Replicator Performance:") + print("records per second:", int(rps)) + print("total time (seconds):", round(time_delta, 2)) + print("*****************************") + print('\n\n') + + # Now test db_replicator performance + print("running db_replicator") + t1 = time.time() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) + db_replicator_runner.run() + + # Make sure the database and table exist before querying + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 2, retry_interval=0.5, max_wait_time=1000) + t2 = time.time() + + db_replicator_runner.stop() + + time_delta = t2 - t1 + rps = num_records / time_delta + + print('\n\n') + print("*****************************") + print("DB Replicator Performance:") print("records per second:", int(rps)) print("total time (seconds):", round(time_delta, 2)) print("*****************************") From 9afe62ee58b65cdd2fa2edca42a6f54ddd639b96 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 29 Mar 2025 15:40:38 +0400 Subject: [PATCH 3/3] Initial replication perftest --- test_mysql_ch_replicator.py | 79 +++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index ed5bb75..7c87cae 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1997,3 +1997,82 @@ def test_year_type(): run_all_runner.stop() assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) assert('Traceback' not in read_logs(TEST_DB_NAME)) + +@pytest.mark.optional +def test_performance_initial_only_replication(): + config_file = 'tests_config_perf.yaml' + num_records = 1000000 + + cfg = config.Settings() + cfg.load(config_file) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id int NOT NULL AUTO_INCREMENT, + name varchar(2048), + age int, + PRIMARY KEY (id) + ); + ''') + + print("populating mysql data") + + base_value = 'a' * 2000 + + for i in range(num_records): + if i % 2000 == 0: + print(f'populated {i} elements') + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) " + f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0, + ) + + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True) + print(f"finished populating {num_records} records") + + # Now test db_replicator performance in initial_only mode + print("running db_replicator in initial_only mode") + t1 = time.time() + + db_replicator_runner = DbReplicatorRunner( + TEST_DB_NAME, + additional_arguments='--initial_only=True', + cfg_file=config_file + ) + db_replicator_runner.run() + db_replicator_runner.wait_complete() # Wait for the process to complete + + # Make sure the database and table exist + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5) + + # Check that all records were replicated + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 1, retry_interval=0.5, max_wait_time=300) + + t2 = time.time() + + time_delta = t2 - t1 + rps = num_records / time_delta + + print('\n\n') + print("*****************************") + print("DB Replicator Initial Only Mode Performance:") + print("records per second:", int(rps)) + print("total time (seconds):", round(time_delta, 2)) + print("*****************************") + print('\n\n') + + db_replicator_runner.stop()