Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,11 +531,14 @@ def get_db_and_table_name(self, token, db_name):
db_name = strip_sql_name(db_name)
table_name = strip_sql_name(table_name)
if self.db_replicator:
if db_name == self.db_replicator.database:
db_name = self.db_replicator.target_database
# Check if database and table match config BEFORE applying mapping
matches_config = (
self.db_replicator.config.is_database_matches(db_name)
and self.db_replicator.config.is_table_matches(table_name))

# Apply database mapping AFTER checking matches_config
if db_name == self.db_replicator.database:
db_name = self.db_replicator.target_database
else:
matches_config = True

Expand Down
92 changes: 92 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2257,3 +2257,95 @@ def test_performance_initial_only_replication():

# Clean up the temporary config file
os.remove(parallel_config_file)


def test_schema_evolution_with_db_mapping():
"""Test case to reproduce issue where schema evolution doesn't work with database mapping."""
# Use the predefined config file with database mapping
config_file = "tests_config_db_mapping.yaml"

cfg = config.Settings()
cfg.load(config_file)

# Note: Not setting a specific database in MySQL API
mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database="mapped_target_db",
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch, db_name=TEST_DB_NAME)

# Create a test table with some columns using fully qualified name
mysql.execute(f'''
CREATE TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (
`id` int NOT NULL,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`));
''')

mysql.execute(
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name) VALUES (1, 'Original')",
commit=True,
)

# Start the replication
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

# Make sure initial replication works with the database mapping
assert_wait(lambda: "mapped_target_db" in ch.get_databases())
ch.execute_command(f'USE `mapped_target_db`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

# Now follow user's sequence of operations with fully qualified names (excluding RENAME operation)
# 1. Add new column
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` ADD COLUMN added_new_column varchar(5)", commit=True)

# 2. Modify column type (skipping the rename step)
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` MODIFY added_new_column varchar(10)", commit=True)

# 3. Insert data using the modified schema
mysql.execute(
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name, added_new_column) VALUES (2, 'Second', 'ABCDE')",
commit=True,
)

# 4. Drop the column - this is where the error was reported
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` DROP COLUMN added_new_column", commit=True)

# 5. Add more inserts after schema changes to verify ongoing replication
mysql.execute(
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name) VALUES (3, 'Third record after drop column')",
commit=True,
)

# Check if all changes were replicated correctly
time.sleep(5) # Allow time for processing the changes
result = ch.select(TEST_TABLE_NAME)
print(f"ClickHouse table contents: {result}")

# Verify all records are present
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)

# Verify specific records exist
records = ch.select(TEST_TABLE_NAME)
print(f"Record type: {type(records[0])}") # Debug the record type

# Access by field name 'id' instead of by position
record_ids = [record['id'] for record in records]
assert 1 in record_ids, "Original record (id=1) not found"
assert 3 in record_ids, "New record (id=3) after schema changes not found"

# Note: This test confirms our fix for schema evolution with database mapping

# Clean up
db_replicator_runner.stop()
binlog_replicator_runner.stop()
28 changes: 28 additions & 0 deletions tests_config_db_mapping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
mysql:
host: 'localhost'
port: 9306
user: 'root'
password: 'admin'

clickhouse:
host: 'localhost'
port: 9123
user: 'default'
password: 'admin'

binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000
binlog_retention_period: 43200 # 12 hours in seconds

databases: '*test*'
log_level: 'debug'
optimize_interval: 3
check_db_updated_interval: 3

# This mapping is the key part that causes issues with schema evolution
target_databases:
replication-test_db: mapped_target_db

http_host: 'localhost'
http_port: 9128