Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,25 @@ def get_system_setting(self, name):
if not results:
return None
return results[0].get('value', None)

def get_max_record_version(self, table_name):
"""
Query the maximum _version value for a given table directly from ClickHouse.

Args:
table_name: The name of the table to query

Returns:
The maximum _version value as an integer, or None if the table doesn't exist
or has no records
"""
try:
query = f"SELECT MAX(_version) FROM `{self.database}`.`{table_name}`"
result = self.client.query(query)
if not result.result_rows or result.result_rows[0][0] is None:
logger.warning(f"No records with _version found in table {table_name}")
return None
return result.result_rows[0][0]
except Exception as e:
logger.error(f"Error querying max _version for table {table_name}: {e}")
return None
27 changes: 27 additions & 0 deletions mysql_ch_replicator/db_replicator_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import sys
import subprocess
import pickle
from logging import getLogger
from enum import Enum

Expand Down Expand Up @@ -213,6 +214,7 @@ def perform_initial_replication_table(self, table_name):
f'replicated {stats_number_of_records} records, '
f'primary key: {max_primary_key}',
)
self.save_state_if_required(force=True)

def perform_initial_replication_table_parallel(self, table_name):
"""
Expand Down Expand Up @@ -273,3 +275,28 @@ def perform_initial_replication_table_parallel(self, table_name):
raise

logger.info(f"All workers completed replication of table {table_name}")

# Consolidate record versions from all worker states
logger.info(f"Consolidating record versions from worker states for table {table_name}")
self.consolidate_worker_record_versions(table_name)

def consolidate_worker_record_versions(self, table_name):
"""
Query ClickHouse directly to get the maximum record version for the specified table
and update the main state with this version.
"""
logger.info(f"Getting maximum record version from ClickHouse for table {table_name}")

# Query ClickHouse for the maximum record version
max_version = self.replicator.clickhouse_api.get_max_record_version(table_name)

if max_version is not None and max_version > 0:
current_version = self.replicator.state.tables_last_record_version.get(table_name, 0)
if max_version > current_version:
logger.info(f"Updating record version for table {table_name} from {current_version} to {max_version}")
self.replicator.state.tables_last_record_version[table_name] = max_version
self.replicator.state.save()
else:
logger.info(f"Current version {current_version} is already up-to-date with ClickHouse version {max_version}")
else:
logger.warning(f"No record version found in ClickHouse for table {table_name}")
127 changes: 125 additions & 2 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,129 @@ def test_initial_only():
db_replicator_runner.stop()


def test_parallel_initial_replication_record_versions():
"""
Test that record versions are properly consolidated from worker states
after parallel initial replication.
"""
# Only run this test with parallel configuration
cfg_file = 'tests_config_parallel.yaml'
cfg = config.Settings()
cfg.load(cfg_file)

# Ensure we have parallel replication configured
assert cfg.initial_replication_threads > 1, "This test requires initial_replication_threads > 1"

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

# Create a table with sufficient records for parallel processing
mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
version int NOT NULL DEFAULT 1,
PRIMARY KEY (id)
);
''')

# Insert a large number of records to ensure parallel processing
for i in range(1, 1001):
mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, version) VALUES ('User{i}', {20+i%50}, {i});",
commit=(i % 100 == 0) # Commit every 100 records
)

# Run initial replication only with parallel workers
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=cfg_file)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=10.0)

ch.execute_command(f'USE `{TEST_DB_NAME}`')

assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=10.0)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1000, max_wait_time=10.0)

db_replicator_runner.stop()

# Verify database and table were created
assert TEST_DB_NAME in ch.get_databases()
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert TEST_TABLE_NAME in ch.get_tables()

# Verify all records were replicated
records = ch.select(TEST_TABLE_NAME)
assert len(records) == 1000

# Instead of reading the state file directly, verify the record versions are correctly handled
# by checking the max _version in the ClickHouse table
versions_query = ch.query(f"SELECT MAX(_version) FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}`")
max_version_in_ch = versions_query.result_rows[0][0]
assert max_version_in_ch >= 200, f"Expected max _version to be at least 200, got {max_version_in_ch}"


# Now test realtime replication to verify versions continue correctly
# Start binlog replication
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=cfg_file)
binlog_replicator_runner.run()

time.sleep(3.0)

# Start DB replicator in realtime mode
realtime_db_replicator = DbReplicatorRunner(TEST_DB_NAME, cfg_file=cfg_file)
realtime_db_replicator.run()

# Insert a new record with version 1001
mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, version) VALUES ('UserRealtime', 99, 1001);",
commit=True
)

# Wait for the record to be replicated
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1001)

# Verify the new record was replicated correctly
realtime_record = ch.select(TEST_TABLE_NAME, where="name='UserRealtime'")[0]
assert realtime_record['age'] == 99
assert realtime_record['version'] == 1001

# Check that the _version column in CH is a reasonable value
# With parallel workers, the _version won't be > 1000 because each worker
# has its own independent version counter and they never intersect
versions_query = ch.query(f"SELECT _version FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` WHERE name='UserRealtime'")
ch_version = versions_query.result_rows[0][0]


# With parallel workers (default is 4), each worker would process ~250 records
# So the version for the new record should be slightly higher than 250
# but definitely lower than 1000
assert ch_version > 0, f"ClickHouse _version should be > 0, but got {ch_version}"

# We expect version to be roughly: (total_records / num_workers) + 1
# For 1000 records and 4 workers, expect around 251
expected_version_approx = 1000 // cfg.initial_replication_threads + 1
# Allow some flexibility in the exact expected value
assert abs(ch_version - expected_version_approx) < 50, (
f"ClickHouse _version should be close to {expected_version_approx}, but got {ch_version}"
)

# Clean up
binlog_replicator_runner.stop()
realtime_db_replicator.stop()
db_replicator_runner.stop()


def test_database_tables_filtering():
cfg = config.Settings()
cfg.load('tests_config_databases_tables.yaml')
Expand Down Expand Up @@ -693,8 +816,8 @@ def test_datetime_exception():
name varchar(255),
modified_date DateTime(3) NOT NULL,
test_date date NOT NULL,
PRIMARY KEY (id)
);
PRIMARY KEY (id)
);
''')

mysql.execute(
Expand Down