From fbd3d106684bd49032574d473c44b67134b28db2 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Mon, 9 Dec 2024 18:16:34 +0400 Subject: [PATCH] Fixed erase multicolumn primary key --- mysql_ch_replicator/clickhouse_api.py | 2 +- test_mysql_ch_replicator.py | 63 +++++++++++++++++++++++++++ tests_config_mariadb.yaml | 2 + 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 6dd3e6c..9f7c475 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -174,7 +174,7 @@ def insert(self, table_name, records, table_structure: TableStructure = None): def erase(self, table_name, field_name, field_values): field_name = ','.join(field_name) - field_values = ', '.join(list(map(str, field_values))) + field_values = ', '.join(f'({v})' for v in field_values) query = DELETE_QUERY.format(**{ 'db_name': self.database, 'table_name': table_name, diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 8e56071..4461ea5 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -401,6 +401,69 @@ def test_runner(): run_all_runner.stop() +def read_logs(db_name): + return open(os.path.join('binlog', db_name, 'db_replicator.log')).read() + + +def test_multi_column_erase(): + config_file = CONFIG_FILE + + cfg = config.Settings() + cfg.load(config_file) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + mysql.drop_database(TEST_DB_NAME_2) + ch.drop_database(TEST_DB_NAME_2) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' +CREATE TABLE {TEST_TABLE_NAME} ( + departments int(11) NOT NULL, + termine int(11) NOT NULL, + PRIMARY KEY (departments,termine) +) +''') + + + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (10, 20);", commit=True) + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (30, 40);", commit=True) + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (50, 60);", commit=True) + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (20, 10);", commit=True) + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (40, 30);", commit=True) + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (60, 50);", commit=True) + + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + + ch.execute_command(f'USE {TEST_DB_NAME}') + + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6) + + mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=10;", commit=True) + mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=30;", commit=True) + mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=50;", commit=True) + + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + + run_all_runner.stop() + + assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) + assert('Traceback' not in read_logs(TEST_DB_NAME)) + + def test_initial_only(): cfg = config.Settings() cfg.load(CONFIG_FILE) diff --git a/tests_config_mariadb.yaml b/tests_config_mariadb.yaml index 7907ed8..c03bc7d 100644 --- a/tests_config_mariadb.yaml +++ b/tests_config_mariadb.yaml @@ -17,3 +17,5 @@ binlog_replicator: databases: '*test*' log_level: 'debug' +optimize_interval: 3 +check_db_updated_interval: 3