From 520e34fb3cc592214ecba42ea03c7e3bf1d42a00 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 25 Jan 2025 22:01:53 +0400 Subject: [PATCH] Option to set another name for destination database --- README.md | 6 ++ mysql_ch_replicator/config.py | 4 ++ mysql_ch_replicator/db_replicator.py | 15 +++- test_mysql_ch_replicator.py | 104 ++++++++++++++++++++++++--- tests_config.yaml | 3 + 5 files changed, 122 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 9360f7a..967ab31 100644 --- a/README.md +++ b/README.md @@ -158,6 +158,11 @@ tables: '*' exclude_databases: ['database_10', 'database_*_42'] # optional exclude_tables: ['meta_table_*'] # optional +target_databases: # optional + source_db_in_mysql_1: destination_db_in_clickhouse_1 + source_db_in_mysql_2: destination_db_in_clickhouse_2 + ... + log_level: 'info' # optional optimize_interval: 86400 # optional auto_restart_interval: 3600 # optional @@ -183,6 +188,7 @@ http_port: 9128 # optional - `tables` - tables to filter, list is also supported - `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority. - `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority. +- `target_databases` - if you want database in ClickHouse to have different name from MySQL database - `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`) - `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance. - `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage. diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index 1a6dfb1..b6d7602 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -110,6 +110,7 @@ def __init__(self): self.auto_restart_interval = 0 self.http_host = '' self.http_port = 0 + self.target_databases = {} def load(self, settings_file): data = open(settings_file, 'r').read() @@ -132,6 +133,7 @@ def load(self, settings_file): ) self.http_host = data.pop('http_host', '') self.http_port = data.pop('http_port', 0) + self.target_databases = data.pop('target_databases', {}) indexes = data.pop('indexes', []) for index in indexes: @@ -189,3 +191,5 @@ def validate(self): self.clickhouse.validate() self.binlog_replicator.validate() self.validate_log_level() + if not isinstance(self.target_databases, dict): + raise ValueError(f'wrong target databases {self.target_databases}') diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 6110413..9a1ac92 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -68,6 +68,7 @@ def save(self): 'tables_structure': self.tables_structure, 'tables': self.tables, 'pid': os.getpid(), + 'save_time': time.time(), }) with open(file_name + '.tmp', 'wb') as f: f.write(data) @@ -108,7 +109,19 @@ class DbReplicator: def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False): self.config = config self.database = database - self.target_database = target_database or database + + # use same as source database by default + self.target_database = database + + # use target database from config file if exists + target_database_from_config = config.target_databases.get(database) + if target_database_from_config: + self.target_database = target_database_from_config + + # use command line argument if exists + if target_database: + self.target_database = target_database + self.target_database_tmp = self.target_database + '_tmp' self.initial_only = initial_only diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 307fd2a..32d6097 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -21,6 +21,8 @@ CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml' TEST_DB_NAME = 'replication-test_db' TEST_DB_NAME_2 = 'replication-test_db_2' +TEST_DB_NAME_2_DESTINATION = 'replication-destination' + TEST_TABLE_NAME = 'test_table' TEST_TABLE_NAME_2 = 'test_table_2' TEST_TABLE_NAME_3 = 'test_table_3' @@ -100,7 +102,7 @@ def test_e2e_regular(config_file): CREATE TABLE `{TEST_TABLE_NAME}` ( id int NOT NULL AUTO_INCREMENT, name varchar(255) COMMENT 'Dân tộc, ví dụ: Kinh', - `age x` int COMMENT 'CMND Cũ', + age int COMMENT 'CMND Cũ', field1 text, field2 blob, PRIMARY KEY (id) @@ -108,10 +110,10 @@ def test_e2e_regular(config_file): ''') mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');", + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');", commit=True, ) - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`) VALUES ('Peter', 33);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Peter', 33);", commit=True) binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) binlog_replicator_runner.run() @@ -125,13 +127,13 @@ def test_e2e_regular(config_file): assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`) VALUES ('Filipp', 50);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) - assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age x'] == 50) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50) mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD `last_name` varchar(255); ") - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, last_name) VALUES ('Mary', 24, 'Smith');", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name) VALUES ('Mary', 24, 'Smith');", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0]['last_name'] == 'Smith') @@ -146,7 +148,7 @@ def test_e2e_regular(config_file): ) mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, last_name, country) " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name, country) " f"VALUES ('John', 12, 'Doe', 'USA');", commit=True, ) @@ -314,6 +316,7 @@ def test_runner(): mysql.drop_database(TEST_DB_NAME_2) ch.drop_database(TEST_DB_NAME_2) + ch.drop_database(TEST_DB_NAME_2_DESTINATION) prepare_env(cfg, mysql, ch) @@ -416,7 +419,7 @@ def test_runner(): assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo') mysql.create_database(TEST_DB_NAME_2) - assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases()) + assert_wait(lambda: TEST_DB_NAME_2_DESTINATION in ch.get_databases()) mysql.execute(f''' CREATE TABLE `group` ( @@ -457,7 +460,7 @@ def test_multi_column_erase(): ) mysql.drop_database(TEST_DB_NAME_2) - ch.drop_database(TEST_DB_NAME_2) + ch.drop_database(TEST_DB_NAME_2_DESTINATION) prepare_env(cfg, mysql, ch) @@ -709,6 +712,89 @@ def test_datetime_exception(): binlog_replicator_runner.stop() +def test_performance(): + config_file = 'tests_config_perf.yaml' + num_records = 100000 + + cfg = config.Settings() + cfg.load(config_file) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id int NOT NULL AUTO_INCREMENT, + name varchar(2048), + age int, + PRIMARY KEY (id) + ); + ''') + + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) + binlog_replicator_runner.run() + + time.sleep(1) + + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True) + + def _get_last_insert_name(): + record = get_last_insert_from_binlog(cfg=cfg, db_name=TEST_DB_NAME) + if record is None: + return None + return record[1].decode('utf-8') + + assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5) + + binlog_replicator_runner.stop() + + time.sleep(1) + + print("populating mysql data") + + base_value = 'a' * 2000 + + for i in range(num_records): + if i % 2000 == 0: + print(f'populated {i} elements') + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) " + f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0, + ) + + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True) + + print("running db_replicator") + t1 = time.time() + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) + binlog_replicator_runner.run() + + assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_FINAL', retry_interval=0.5, max_wait_time=1000) + t2 = time.time() + + binlog_replicator_runner.stop() + + time_delta = t2 - t1 + rps = num_records / time_delta + + print('\n\n') + print("*****************************") + print("records per second:", int(rps)) + print("total time (seconds):", round(time_delta, 2)) + print("*****************************") + print('\n\n') + + + def test_different_types_1(): cfg = config.Settings() cfg.load(CONFIG_FILE) diff --git a/tests_config.yaml b/tests_config.yaml index fd28eff..45bac0c 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -20,6 +20,9 @@ log_level: 'debug' optimize_interval: 3 check_db_updated_interval: 3 +target_databases: + replication-test_db_2: replication-destination + indexes: - databases: '*' tables: ['group']