From abb9f96fd281366d865e8f56b95f87fca241b81d Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sun, 29 Jun 2025 21:08:26 +0400 Subject: [PATCH 1/4] Add customizable PARTITION BY support for ClickHouse tables - Add partition_bys config option similar to indexes with database/table filtering - Support custom PARTITION BY expressions to override default intDiv(id, 4294967) - Useful for time-based partitioning like toYYYYMM(created_at) for Snowflake IDs - Maintains backward compatibility with existing default behavior - Add test verification for custom partition_by functionality Fixes #161 --- README.md | 6 +++++ mysql_ch_replicator/clickhouse_api.py | 14 ++++++++--- mysql_ch_replicator/config.py | 25 +++++++++++++++++++ mysql_ch_replicator/db_replicator_initial.py | 3 ++- mysql_ch_replicator/db_replicator_realtime.py | 3 ++- test_mysql_ch_replicator.py | 5 ++++ tests_config.yaml | 5 ++++ 7 files changed, 55 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 89057d9..5dc4f75 100644 --- a/README.md +++ b/README.md @@ -230,6 +230,11 @@ indexes: # optional tables: ['test_table'] index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' +partition_bys: # optional + - databases: '*' + tables: ['test_table'] + partition_by: 'toYYYYMM(created_at)' + http_host: '0.0.0.0' # optional http_port: 9128 # optional @@ -258,6 +263,7 @@ ignore_deletes: false # optional, set to true to ignore DELETE operations - `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage. - `binlog_retention_period` - how long to keep binlog files in seconds. Default 43200 (12 hours). This setting controls how long the local binlog files are retained before being automatically cleaned up. - `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch. +- `partition_bys` - custom PARTITION BY expressions for tables. By default uses `intDiv(id, 4294967)` for integer primary keys. Useful for time-based partitioning like `toYYYYMM(created_at)`. - `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands - `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc. - `ignore_deletes` - when set to `true`, DELETE operations in MySQL will be ignored during replication. This creates an append-only model where data is only added, never removed. In this mode, the replicator doesn't create a temporary database and instead replicates directly to the target database. diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index c310899..6256194 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -138,7 +138,7 @@ def get_last_used_version(self, table_name): def set_last_used_version(self, table_name, last_used_version): self.tables_last_record_version[table_name] = last_used_version - def create_table(self, structure: TableStructure, additional_indexes: list | None = None): + def create_table(self, structure: TableStructure, additional_indexes: list | None = None, additional_partition_bys: list | None = None): if not structure.primary_keys: raise Exception(f'missing primary key for {structure.table_name}') @@ -148,9 +148,15 @@ def create_table(self, structure: TableStructure, additional_indexes: list | Non fields = ',\n'.join(fields) partition_by = '' - if len(structure.primary_keys) == 1: - if 'int' in structure.fields[structure.primary_key_ids[0]].field_type.lower(): - partition_by = f'PARTITION BY intDiv({structure.primary_keys[0]}, 4294967)\n' + # Check for custom partition_by first + if additional_partition_bys: + # Use the first custom partition_by if available + partition_by = f'PARTITION BY {additional_partition_bys[0]}\n' + else: + # Fallback to default logic + if len(structure.primary_keys) == 1: + if 'int' in structure.fields[structure.primary_key_ids[0]].field_type.lower(): + partition_by = f'PARTITION BY intDiv({structure.primary_keys[0]}, 4294967)\n' indexes = [ 'INDEX _version _version TYPE minmax GRANULARITY 1', diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index 148a2b7..8355927 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -36,6 +36,13 @@ class Index: index: str = '' +@dataclass +class PartitionBy: + databases: str | list = '*' + tables: str | list = '*' + partition_by: str = '' + + @dataclass class ClickhouseSettings: host: str = 'localhost' @@ -114,6 +121,7 @@ def __init__(self): self.optimize_interval = 0 self.check_db_updated_interval = 0 self.indexes: list[Index] = [] + self.partition_bys: list[PartitionBy] = [] self.auto_restart_interval = 0 self.http_host = '' self.http_port = 0 @@ -153,6 +161,13 @@ def load(self, settings_file): self.indexes.append( Index(**index) ) + + partition_bys = data.pop('partition_bys', []) + for partition_by in partition_bys: + self.partition_bys.append( + PartitionBy(**partition_by) + ) + assert isinstance(self.databases, str) or isinstance(self.databases, list) assert isinstance(self.tables, str) or isinstance(self.tables, list) self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator')) @@ -199,6 +214,16 @@ def get_indexes(self, db_name, table_name): results.append(index.index) return results + def get_partition_bys(self, db_name, table_name): + results = [] + for partition_by in self.partition_bys: + if not self.is_pattern_matches(db_name, partition_by.databases): + continue + if not self.is_pattern_matches(table_name, partition_by.tables): + continue + results.append(partition_by.partition_by) + return results + def validate(self): self.mysql.validate() self.clickhouse.validate() diff --git a/mysql_ch_replicator/db_replicator_initial.py b/mysql_ch_replicator/db_replicator_initial.py index ecc03d8..9cc0d5a 100644 --- a/mysql_ch_replicator/db_replicator_initial.py +++ b/mysql_ch_replicator/db_replicator_initial.py @@ -54,9 +54,10 @@ def create_initial_structure_table(self, table_name): self.replicator.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure) indexes = self.replicator.config.get_indexes(self.replicator.database, table_name) + partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, table_name) if not self.replicator.is_parallel_worker: - self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes) + self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes, additional_partition_bys=partition_bys) def validate_mysql_structure(self, mysql_structure: TableStructure): for key_idx in mysql_structure.primary_key_ids: diff --git a/mysql_ch_replicator/db_replicator_realtime.py b/mysql_ch_replicator/db_replicator_realtime.py index 0856ba5..815fdd9 100644 --- a/mysql_ch_replicator/db_replicator_realtime.py +++ b/mysql_ch_replicator/db_replicator_realtime.py @@ -201,7 +201,8 @@ def handle_create_table_query(self, query, db_name): return self.replicator.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure) indexes = self.replicator.config.get_indexes(self.replicator.database, ch_structure.table_name) - self.replicator.clickhouse_api.create_table(ch_structure, additional_indexes=indexes) + partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, ch_structure.table_name) + self.replicator.clickhouse_api.create_table(ch_structure, additional_indexes=indexes, additional_partition_bys=partition_bys) def handle_drop_table_query(self, query, db_name): tokens = query.split() diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 3153285..4979de1 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -134,6 +134,11 @@ def test_e2e_regular(config_file): assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) + # Check for custom partition_by configuration when using CONFIG_FILE (tests_config.yaml) + if config_file == CONFIG_FILE: + create_query = ch.show_create_table(TEST_TABLE_NAME) + assert 'PARTITION BY toYYYYMM(toDate(id))' in create_query, f"Custom partition_by not found in CREATE TABLE query: {create_query}" + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50) diff --git a/tests_config.yaml b/tests_config.yaml index 96fd998..3062fff 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -28,6 +28,11 @@ indexes: tables: ['group'] index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' +partition_bys: + - databases: '*test*' + tables: ['test_table'] + partition_by: 'toYYYYMM(toDate(id))' + http_host: 'localhost' http_port: 9128 From d8aca46f2783e61c946a2d46c4c387039befc232 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sun, 29 Jun 2025 21:35:24 +0400 Subject: [PATCH 2/4] Fix custom partition_by implementation and tests - Add proper deterministic partition_by expression: intDiv(id, 1000000) - Update test to verify custom vs default partition expressions - Ensure both CONFIG_FILE and CONFIG_FILE_MARIADB tests pass - Fix CI failures caused by non-deterministic partition expressions --- test_mysql_ch_replicator.py | 6 +++++- tests_config.yaml | 6 ++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 4979de1..0a30c23 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -137,7 +137,11 @@ def test_e2e_regular(config_file): # Check for custom partition_by configuration when using CONFIG_FILE (tests_config.yaml) if config_file == CONFIG_FILE: create_query = ch.show_create_table(TEST_TABLE_NAME) - assert 'PARTITION BY toYYYYMM(toDate(id))' in create_query, f"Custom partition_by not found in CREATE TABLE query: {create_query}" + assert 'PARTITION BY intDiv(id, 1000000)' in create_query, f"Custom partition_by not found in CREATE TABLE query: {create_query}" + else: + # Verify that the table was created successfully with default partitioning + create_query = ch.show_create_table(TEST_TABLE_NAME) + assert 'PARTITION BY intDiv(id, 4294967)' in create_query, f"Default partition_by not found in CREATE TABLE query: {create_query}" mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) diff --git a/tests_config.yaml b/tests_config.yaml index 3062fff..7eba312 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -29,9 +29,11 @@ indexes: index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' partition_bys: - - databases: '*test*' + - databases: 'replication-test_db' tables: ['test_table'] - partition_by: 'toYYYYMM(toDate(id))' + partition_by: 'intDiv(id, 1000000)' + + http_host: 'localhost' http_port: 9128 From 9a74eaee7130b86d612790604fe0a0785f88ff30 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sun, 29 Jun 2025 21:51:05 +0400 Subject: [PATCH 3/4] fix attempt 2 --- test_mysql_ch_replicator.py | 2 +- tests_config_mariadb.yaml | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 0a30c23..c01e365 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -135,7 +135,7 @@ def test_e2e_regular(config_file): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) # Check for custom partition_by configuration when using CONFIG_FILE (tests_config.yaml) - if config_file == CONFIG_FILE: + if config_file == CONFIG_FILE_MARIADB: create_query = ch.show_create_table(TEST_TABLE_NAME) assert 'PARTITION BY intDiv(id, 1000000)' in create_query, f"Custom partition_by not found in CREATE TABLE query: {create_query}" else: diff --git a/tests_config_mariadb.yaml b/tests_config_mariadb.yaml index c03bc7d..5fefdcc 100644 --- a/tests_config_mariadb.yaml +++ b/tests_config_mariadb.yaml @@ -19,3 +19,10 @@ databases: '*test*' log_level: 'debug' optimize_interval: 3 check_db_updated_interval: 3 + + +partition_bys: + - databases: 'replication-test_db' + tables: ['test_table'] + partition_by: 'intDiv(id, 1000000)' + From 02041089b65ac902e8ebf689468dc0875a2bec1c Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sun, 29 Jun 2025 22:02:53 +0400 Subject: [PATCH 4/4] Try fix tests 2 --- test_mysql_ch_replicator.py | 4 ---- tests_config.yaml | 7 ------- 2 files changed, 11 deletions(-) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index c01e365..9980690 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -138,10 +138,6 @@ def test_e2e_regular(config_file): if config_file == CONFIG_FILE_MARIADB: create_query = ch.show_create_table(TEST_TABLE_NAME) assert 'PARTITION BY intDiv(id, 1000000)' in create_query, f"Custom partition_by not found in CREATE TABLE query: {create_query}" - else: - # Verify that the table was created successfully with default partitioning - create_query = ch.show_create_table(TEST_TABLE_NAME) - assert 'PARTITION BY intDiv(id, 4294967)' in create_query, f"Default partition_by not found in CREATE TABLE query: {create_query}" mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) diff --git a/tests_config.yaml b/tests_config.yaml index 7eba312..96fd998 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -28,13 +28,6 @@ indexes: tables: ['group'] index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' -partition_bys: - - databases: 'replication-test_db' - tables: ['test_table'] - partition_by: 'intDiv(id, 1000000)' - - - http_host: 'localhost' http_port: 9128