diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 825986d..411a21a 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -292,3 +292,25 @@ def get_system_setting(self, name): if not results: return None return results[0].get('value', None) + + def get_max_record_version(self, table_name): + """ + Query the maximum _version value for a given table directly from ClickHouse. + + Args: + table_name: The name of the table to query + + Returns: + The maximum _version value as an integer, or None if the table doesn't exist + or has no records + """ + try: + query = f"SELECT MAX(_version) FROM `{self.database}`.`{table_name}`" + result = self.client.query(query) + if not result.result_rows or result.result_rows[0][0] is None: + logger.warning(f"No records with _version found in table {table_name}") + return None + return result.result_rows[0][0] + except Exception as e: + logger.error(f"Error querying max _version for table {table_name}: {e}") + return None diff --git a/mysql_ch_replicator/db_replicator_initial.py b/mysql_ch_replicator/db_replicator_initial.py index f605667..bc56e2d 100644 --- a/mysql_ch_replicator/db_replicator_initial.py +++ b/mysql_ch_replicator/db_replicator_initial.py @@ -4,6 +4,7 @@ import time import sys import subprocess +import pickle from logging import getLogger from enum import Enum @@ -213,6 +214,7 @@ def perform_initial_replication_table(self, table_name): f'replicated {stats_number_of_records} records, ' f'primary key: {max_primary_key}', ) + self.save_state_if_required(force=True) def perform_initial_replication_table_parallel(self, table_name): """ @@ -273,3 +275,28 @@ def perform_initial_replication_table_parallel(self, table_name): raise logger.info(f"All workers completed replication of table {table_name}") + + # Consolidate record versions from all worker states + logger.info(f"Consolidating record versions from worker states for table {table_name}") + self.consolidate_worker_record_versions(table_name) + + def consolidate_worker_record_versions(self, table_name): + """ + Query ClickHouse directly to get the maximum record version for the specified table + and update the main state with this version. + """ + logger.info(f"Getting maximum record version from ClickHouse for table {table_name}") + + # Query ClickHouse for the maximum record version + max_version = self.replicator.clickhouse_api.get_max_record_version(table_name) + + if max_version is not None and max_version > 0: + current_version = self.replicator.state.tables_last_record_version.get(table_name, 0) + if max_version > current_version: + logger.info(f"Updating record version for table {table_name} from {current_version} to {max_version}") + self.replicator.state.tables_last_record_version[table_name] = max_version + self.replicator.state.save() + else: + logger.info(f"Current version {current_version} is already up-to-date with ClickHouse version {max_version}") + else: + logger.warning(f"No record version found in ClickHouse for table {table_name}") diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 16c57d7..c1fee2f 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -574,6 +574,129 @@ def test_initial_only(): db_replicator_runner.stop() +def test_parallel_initial_replication_record_versions(): + """ + Test that record versions are properly consolidated from worker states + after parallel initial replication. + """ + # Only run this test with parallel configuration + cfg_file = 'tests_config_parallel.yaml' + cfg = config.Settings() + cfg.load(cfg_file) + + # Ensure we have parallel replication configured + assert cfg.initial_replication_threads > 1, "This test requires initial_replication_threads > 1" + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + # Create a table with sufficient records for parallel processing + mysql.execute(f''' +CREATE TABLE `{TEST_TABLE_NAME}` ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255), + age int, + version int NOT NULL DEFAULT 1, + PRIMARY KEY (id) +); + ''') + + # Insert a large number of records to ensure parallel processing + for i in range(1, 1001): + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, version) VALUES ('User{i}', {20+i%50}, {i});", + commit=(i % 100 == 0) # Commit every 100 records + ) + + # Run initial replication only with parallel workers + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=cfg_file) + db_replicator_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=10.0) + + ch.execute_command(f'USE `{TEST_DB_NAME}`') + + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=10.0) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1000, max_wait_time=10.0) + + db_replicator_runner.stop() + + # Verify database and table were created + assert TEST_DB_NAME in ch.get_databases() + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert TEST_TABLE_NAME in ch.get_tables() + + # Verify all records were replicated + records = ch.select(TEST_TABLE_NAME) + assert len(records) == 1000 + + # Instead of reading the state file directly, verify the record versions are correctly handled + # by checking the max _version in the ClickHouse table + versions_query = ch.query(f"SELECT MAX(_version) FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}`") + max_version_in_ch = versions_query.result_rows[0][0] + assert max_version_in_ch >= 200, f"Expected max _version to be at least 200, got {max_version_in_ch}" + + + # Now test realtime replication to verify versions continue correctly + # Start binlog replication + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=cfg_file) + binlog_replicator_runner.run() + + time.sleep(3.0) + + # Start DB replicator in realtime mode + realtime_db_replicator = DbReplicatorRunner(TEST_DB_NAME, cfg_file=cfg_file) + realtime_db_replicator.run() + + # Insert a new record with version 1001 + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, version) VALUES ('UserRealtime', 99, 1001);", + commit=True + ) + + # Wait for the record to be replicated + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1001) + + # Verify the new record was replicated correctly + realtime_record = ch.select(TEST_TABLE_NAME, where="name='UserRealtime'")[0] + assert realtime_record['age'] == 99 + assert realtime_record['version'] == 1001 + + # Check that the _version column in CH is a reasonable value + # With parallel workers, the _version won't be > 1000 because each worker + # has its own independent version counter and they never intersect + versions_query = ch.query(f"SELECT _version FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` WHERE name='UserRealtime'") + ch_version = versions_query.result_rows[0][0] + + + # With parallel workers (default is 4), each worker would process ~250 records + # So the version for the new record should be slightly higher than 250 + # but definitely lower than 1000 + assert ch_version > 0, f"ClickHouse _version should be > 0, but got {ch_version}" + + # We expect version to be roughly: (total_records / num_workers) + 1 + # For 1000 records and 4 workers, expect around 251 + expected_version_approx = 1000 // cfg.initial_replication_threads + 1 + # Allow some flexibility in the exact expected value + assert abs(ch_version - expected_version_approx) < 50, ( + f"ClickHouse _version should be close to {expected_version_approx}, but got {ch_version}" + ) + + # Clean up + binlog_replicator_runner.stop() + realtime_db_replicator.stop() + db_replicator_runner.stop() + + def test_database_tables_filtering(): cfg = config.Settings() cfg.load('tests_config_databases_tables.yaml') @@ -693,8 +816,8 @@ def test_datetime_exception(): name varchar(255), modified_date DateTime(3) NOT NULL, test_date date NOT NULL, - PRIMARY KEY (id) -); + PRIMARY KEY (id) + ); ''') mysql.execute(