diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 7ba5381..58aa7d3 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -531,11 +531,14 @@ def get_db_and_table_name(self, token, db_name): db_name = strip_sql_name(db_name) table_name = strip_sql_name(table_name) if self.db_replicator: - if db_name == self.db_replicator.database: - db_name = self.db_replicator.target_database + # Check if database and table match config BEFORE applying mapping matches_config = ( self.db_replicator.config.is_database_matches(db_name) and self.db_replicator.config.is_table_matches(table_name)) + + # Apply database mapping AFTER checking matches_config + if db_name == self.db_replicator.database: + db_name = self.db_replicator.target_database else: matches_config = True diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index c1fee2f..8db5bfe 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -2257,3 +2257,95 @@ def test_performance_initial_only_replication(): # Clean up the temporary config file os.remove(parallel_config_file) + + +def test_schema_evolution_with_db_mapping(): + """Test case to reproduce issue where schema evolution doesn't work with database mapping.""" + # Use the predefined config file with database mapping + config_file = "tests_config_db_mapping.yaml" + + cfg = config.Settings() + cfg.load(config_file) + + # Note: Not setting a specific database in MySQL API + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database="mapped_target_db", + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch, db_name=TEST_DB_NAME) + + # Create a test table with some columns using fully qualified name + mysql.execute(f''' +CREATE TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` ( + `id` int NOT NULL, + `name` varchar(255) NOT NULL, + PRIMARY KEY (`id`)); + ''') + + mysql.execute( + f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name) VALUES (1, 'Original')", + commit=True, + ) + + # Start the replication + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) + db_replicator_runner.run() + + # Make sure initial replication works with the database mapping + assert_wait(lambda: "mapped_target_db" in ch.get_databases()) + ch.execute_command(f'USE `mapped_target_db`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + + # Now follow user's sequence of operations with fully qualified names (excluding RENAME operation) + # 1. Add new column + mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` ADD COLUMN added_new_column varchar(5)", commit=True) + + # 2. Modify column type (skipping the rename step) + mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` MODIFY added_new_column varchar(10)", commit=True) + + # 3. Insert data using the modified schema + mysql.execute( + f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name, added_new_column) VALUES (2, 'Second', 'ABCDE')", + commit=True, + ) + + # 4. Drop the column - this is where the error was reported + mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` DROP COLUMN added_new_column", commit=True) + + # 5. Add more inserts after schema changes to verify ongoing replication + mysql.execute( + f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name) VALUES (3, 'Third record after drop column')", + commit=True, + ) + + # Check if all changes were replicated correctly + time.sleep(5) # Allow time for processing the changes + result = ch.select(TEST_TABLE_NAME) + print(f"ClickHouse table contents: {result}") + + # Verify all records are present + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + + # Verify specific records exist + records = ch.select(TEST_TABLE_NAME) + print(f"Record type: {type(records[0])}") # Debug the record type + + # Access by field name 'id' instead of by position + record_ids = [record['id'] for record in records] + assert 1 in record_ids, "Original record (id=1) not found" + assert 3 in record_ids, "New record (id=3) after schema changes not found" + + # Note: This test confirms our fix for schema evolution with database mapping + + # Clean up + db_replicator_runner.stop() + binlog_replicator_runner.stop() diff --git a/tests_config_db_mapping.yaml b/tests_config_db_mapping.yaml new file mode 100644 index 0000000..5876324 --- /dev/null +++ b/tests_config_db_mapping.yaml @@ -0,0 +1,28 @@ +mysql: + host: 'localhost' + port: 9306 + user: 'root' + password: 'admin' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + binlog_retention_period: 43200 # 12 hours in seconds + +databases: '*test*' +log_level: 'debug' +optimize_interval: 3 +check_db_updated_interval: 3 + +# This mapping is the key part that causes issues with schema evolution +target_databases: + replication-test_db: mapped_target_db + +http_host: 'localhost' +http_port: 9128 \ No newline at end of file