Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ indexes: # optional
tables: ['test_table']
index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'

partition_bys: # optional
- databases: '*'
tables: ['test_table']
partition_by: 'toYYYYMM(created_at)'

http_host: '0.0.0.0' # optional
http_port: 9128 # optional

Expand Down Expand Up @@ -258,6 +263,7 @@ ignore_deletes: false # optional, set to true to ignore DELETE operations
- `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage.
- `binlog_retention_period` - how long to keep binlog files in seconds. Default 43200 (12 hours). This setting controls how long the local binlog files are retained before being automatically cleaned up.
- `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch.
- `partition_bys` - custom PARTITION BY expressions for tables. By default uses `intDiv(id, 4294967)` for integer primary keys. Useful for time-based partitioning like `toYYYYMM(created_at)`.
- `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands
- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc.
- `ignore_deletes` - when set to `true`, DELETE operations in MySQL will be ignored during replication. This creates an append-only model where data is only added, never removed. In this mode, the replicator doesn't create a temporary database and instead replicates directly to the target database.
Expand Down
14 changes: 10 additions & 4 deletions mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def get_last_used_version(self, table_name):
def set_last_used_version(self, table_name, last_used_version):
self.tables_last_record_version[table_name] = last_used_version

def create_table(self, structure: TableStructure, additional_indexes: list | None = None):
def create_table(self, structure: TableStructure, additional_indexes: list | None = None, additional_partition_bys: list | None = None):
if not structure.primary_keys:
raise Exception(f'missing primary key for {structure.table_name}')

Expand All @@ -148,9 +148,15 @@ def create_table(self, structure: TableStructure, additional_indexes: list | Non
fields = ',\n'.join(fields)
partition_by = ''

if len(structure.primary_keys) == 1:
if 'int' in structure.fields[structure.primary_key_ids[0]].field_type.lower():
partition_by = f'PARTITION BY intDiv({structure.primary_keys[0]}, 4294967)\n'
# Check for custom partition_by first
if additional_partition_bys:
# Use the first custom partition_by if available
partition_by = f'PARTITION BY {additional_partition_bys[0]}\n'
else:
# Fallback to default logic
if len(structure.primary_keys) == 1:
if 'int' in structure.fields[structure.primary_key_ids[0]].field_type.lower():
partition_by = f'PARTITION BY intDiv({structure.primary_keys[0]}, 4294967)\n'

indexes = [
'INDEX _version _version TYPE minmax GRANULARITY 1',
Expand Down
25 changes: 25 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ class Index:
index: str = ''


@dataclass
class PartitionBy:
databases: str | list = '*'
tables: str | list = '*'
partition_by: str = ''


@dataclass
class ClickhouseSettings:
host: str = 'localhost'
Expand Down Expand Up @@ -114,6 +121,7 @@ def __init__(self):
self.optimize_interval = 0
self.check_db_updated_interval = 0
self.indexes: list[Index] = []
self.partition_bys: list[PartitionBy] = []
self.auto_restart_interval = 0
self.http_host = ''
self.http_port = 0
Expand Down Expand Up @@ -153,6 +161,13 @@ def load(self, settings_file):
self.indexes.append(
Index(**index)
)

partition_bys = data.pop('partition_bys', [])
for partition_by in partition_bys:
self.partition_bys.append(
PartitionBy(**partition_by)
)

assert isinstance(self.databases, str) or isinstance(self.databases, list)
assert isinstance(self.tables, str) or isinstance(self.tables, list)
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))
Expand Down Expand Up @@ -199,6 +214,16 @@ def get_indexes(self, db_name, table_name):
results.append(index.index)
return results

def get_partition_bys(self, db_name, table_name):
results = []
for partition_by in self.partition_bys:
if not self.is_pattern_matches(db_name, partition_by.databases):
continue
if not self.is_pattern_matches(table_name, partition_by.tables):
continue
results.append(partition_by.partition_by)
return results

def validate(self):
self.mysql.validate()
self.clickhouse.validate()
Expand Down
3 changes: 2 additions & 1 deletion mysql_ch_replicator/db_replicator_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ def create_initial_structure_table(self, table_name):

self.replicator.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
indexes = self.replicator.config.get_indexes(self.replicator.database, table_name)
partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, table_name)

if not self.replicator.is_parallel_worker:
self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)
self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes, additional_partition_bys=partition_bys)

def validate_mysql_structure(self, mysql_structure: TableStructure):
for key_idx in mysql_structure.primary_key_ids:
Expand Down
3 changes: 2 additions & 1 deletion mysql_ch_replicator/db_replicator_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ def handle_create_table_query(self, query, db_name):
return
self.replicator.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
indexes = self.replicator.config.get_indexes(self.replicator.database, ch_structure.table_name)
self.replicator.clickhouse_api.create_table(ch_structure, additional_indexes=indexes)
partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, ch_structure.table_name)
self.replicator.clickhouse_api.create_table(ch_structure, additional_indexes=indexes, additional_partition_bys=partition_bys)

def handle_drop_table_query(self, query, db_name):
tokens = query.split()
Expand Down
5 changes: 5 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def test_e2e_regular(config_file):
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

# Check for custom partition_by configuration when using CONFIG_FILE (tests_config.yaml)
if config_file == CONFIG_FILE_MARIADB:
create_query = ch.show_create_table(TEST_TABLE_NAME)
assert 'PARTITION BY intDiv(id, 1000000)' in create_query, f"Custom partition_by not found in CREATE TABLE query: {create_query}"

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50)
Expand Down
7 changes: 7 additions & 0 deletions tests_config_mariadb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,10 @@ databases: '*test*'
log_level: 'debug'
optimize_interval: 3
check_db_updated_interval: 3


partition_bys:
- databases: 'replication-test_db'
tables: ['test_table']
partition_by: 'intDiv(id, 1000000)'