From 79c97335994eafda13beb79faa9d6d3f2723ce72 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sun, 24 Nov 2024 21:03:06 +0400 Subject: [PATCH] Support for indexes --- README.md | 10 ++++++++++ mysql_ch_replicator/clickhouse_api.py | 11 ++++++++++- mysql_ch_replicator/config.py | 23 +++++++++++++++++++++++ mysql_ch_replicator/converter.py | 2 +- mysql_ch_replicator/db_replicator.py | 6 ++++-- test_mysql_ch_replicator.py | 17 ++++++++++++++++- tests_config.yaml | 5 +++++ 7 files changed, 69 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 4ec6436..c7c40bd 100644 --- a/README.md +++ b/README.md @@ -134,11 +134,20 @@ binlog_replicator: databases: 'database_name_pattern_*' tables: '*' + +# OPTIONAL SETTINGS + exclude_databases: ['database_10', 'database_*_42'] # optional exclude_tables: ['meta_table_*'] # optional log_level: 'info' # optional optimize_interval: 86400 # optional + +indexes: # optional + - databases: '*' + tables: ['test_table'] + index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' + ``` #### Required settings @@ -154,6 +163,7 @@ optimize_interval: 86400 # optional - `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority. - `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`) - `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance. +- `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch. Few more tables / dbs examples: diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 3a6916c..011bc4a 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -82,7 +82,7 @@ def get_last_used_version(self, table_name): def set_last_used_version(self, table_name, last_used_version): self.tables_last_record_version[table_name] = last_used_version - def create_table(self, structure: TableStructure): + def create_table(self, structure: TableStructure, additional_indexes: list | None = None): if not structure.primary_keys: raise Exception(f'missing primary key for {structure.table_name}') @@ -103,6 +103,8 @@ def create_table(self, structure: TableStructure): indexes.append( f'INDEX idx_id {structure.primary_keys[0]} TYPE bloom_filter GRANULARITY 1', ) + if additional_indexes is not None: + indexes += additional_indexes indexes = ',\n'.join(indexes) primary_key = ','.join(structure.primary_keys) @@ -117,6 +119,7 @@ def create_table(self, structure: TableStructure): 'partition_by': partition_by, 'indexes': indexes, }) + print(" === query:", query) self.execute_command(query) def insert(self, table_name, records, table_structure: TableStructure = None): @@ -196,6 +199,12 @@ def select(self, table_name, where=None, final=None): results.append(dict(zip(columns, row))) return results + def query(self, query: str): + return self.client.query(query) + + def show_create_table(self, table_name): + return self.client.query(f'SHOW CREATE TABLE {table_name}').result_rows[0][0] + def get_system_setting(self, name): results = self.select('system.settings', f"name = '{name}'") if not results: diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index 2c31946..5eb4ca5 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -29,6 +29,13 @@ def validate(self): raise ValueError(f'mysql password should be string and not {stype(self.password)}') +@dataclass +class Index: + databases: str | list = '*' + tables: str | list = '*' + index: str = '' + + @dataclass class ClickhouseSettings: host: str = 'localhost' @@ -98,6 +105,7 @@ def __init__(self): self.debug_log_level = False self.optimize_interval = 0 self.check_db_updated_interval = 0 + self.indexes: list[Index] = [] def load(self, settings_file): data = open(settings_file, 'r').read() @@ -115,6 +123,11 @@ def load(self, settings_file): self.check_db_updated_interval = data.pop( 'check_db_updated_interval', Settings.DEFAULT_CHECK_DB_UPDATED_INTERVAL, ) + indexes = data.pop('indexes', []) + for index in indexes: + self.indexes.append( + Index(**index) + ) assert isinstance(self.databases, str) or isinstance(self.databases, list) assert isinstance(self.tables, str) or isinstance(self.tables, list) self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator')) @@ -151,6 +164,16 @@ def validate_log_level(self): if self.log_level == 'debug': self.debug_log_level = True + def get_indexes(self, db_name, table_name): + results = [] + for index in self.indexes: + if not self.is_pattern_matches(db_name, index.databases): + continue + if not self.is_pattern_matches(table_name, index.tables): + continue + results.append(index.index) + return results + def validate(self): self.mysql.validate() self.clickhouse.validate() diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 0f9381d..b762777 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -472,7 +472,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens): query = f'ALTER TABLE {db_name}.{table_name} RENAME COLUMN {column_name} TO {new_column_name}' self.db_replicator.clickhouse_api.execute_command(query) - def parse_create_table_query(self, mysql_query) -> tuple: + def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]: mysql_table_structure = self.parse_mysql_table_structure(mysql_query) ch_table_structure = self.convert_table_structure(mysql_table_structure) return mysql_table_structure, ch_table_structure diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 94178ed..70728c2 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -214,7 +214,8 @@ def create_initial_structure_table(self, table_name): self.validate_mysql_structure(mysql_structure) clickhouse_structure = self.converter.convert_table_structure(mysql_structure) self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure) - self.clickhouse_api.create_table(clickhouse_structure) + indexes = self.config.get_indexes(self.database, table_name) + self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes) def prevent_binlog_removal(self): if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL: @@ -480,7 +481,8 @@ def handle_create_table_query(self, query, db_name): if not self.config.is_table_matches(mysql_structure.table_name): return self.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure) - self.clickhouse_api.create_table(ch_structure) + indexes = self.config.get_indexes(self.database, ch_structure.table_name) + self.clickhouse_api.create_table(ch_structure, additional_indexes=indexes) def handle_drop_table_query(self, query, db_name): tokens = query.split() diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 0d5c3ff..3b9999c 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -366,7 +366,22 @@ def test_runner(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4) mysql.create_database(TEST_DB_NAME_2) - assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases(), max_wait_time=5) + assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases()) + + mysql.execute(f''' + CREATE TABLE test_table_with_index ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255) NOT NULL, + age int, + rate decimal(10,4), + PRIMARY KEY (id) + ); + ''') + + assert_wait(lambda: 'test_table_with_index' in ch.get_tables()) + + create_query = ch.show_create_table('test_table_with_index') + assert 'INDEX name_idx name TYPE ngrambf_v1' in create_query run_all_runner.stop() diff --git a/tests_config.yaml b/tests_config.yaml index 196bf79..cb99a7d 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -19,3 +19,8 @@ databases: '*test*' log_level: 'debug' optimize_interval: 3 check_db_updated_interval: 3 + +indexes: + - databases: '*' + tables: ['test_table_with_index'] + index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'