Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,20 @@ binlog_replicator:
databases: 'database_name_pattern_*'
tables: '*'


# OPTIONAL SETTINGS

exclude_databases: ['database_10', 'database_*_42'] # optional
exclude_tables: ['meta_table_*'] # optional

log_level: 'info' # optional
optimize_interval: 86400 # optional

indexes: # optional
- databases: '*'
tables: ['test_table']
index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'

```

#### Required settings
Expand All @@ -154,6 +163,7 @@ optimize_interval: 86400 # optional
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
- `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch.

Few more tables / dbs examples:

Expand Down
11 changes: 10 additions & 1 deletion mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_last_used_version(self, table_name):
def set_last_used_version(self, table_name, last_used_version):
self.tables_last_record_version[table_name] = last_used_version

def create_table(self, structure: TableStructure):
def create_table(self, structure: TableStructure, additional_indexes: list | None = None):
if not structure.primary_keys:
raise Exception(f'missing primary key for {structure.table_name}')

Expand All @@ -103,6 +103,8 @@ def create_table(self, structure: TableStructure):
indexes.append(
f'INDEX idx_id {structure.primary_keys[0]} TYPE bloom_filter GRANULARITY 1',
)
if additional_indexes is not None:
indexes += additional_indexes

indexes = ',\n'.join(indexes)
primary_key = ','.join(structure.primary_keys)
Expand All @@ -117,6 +119,7 @@ def create_table(self, structure: TableStructure):
'partition_by': partition_by,
'indexes': indexes,
})
print(" === query:", query)
self.execute_command(query)

def insert(self, table_name, records, table_structure: TableStructure = None):
Expand Down Expand Up @@ -196,6 +199,12 @@ def select(self, table_name, where=None, final=None):
results.append(dict(zip(columns, row)))
return results

def query(self, query: str):
return self.client.query(query)

def show_create_table(self, table_name):
return self.client.query(f'SHOW CREATE TABLE {table_name}').result_rows[0][0]

def get_system_setting(self, name):
results = self.select('system.settings', f"name = '{name}'")
if not results:
Expand Down
23 changes: 23 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ def validate(self):
raise ValueError(f'mysql password should be string and not {stype(self.password)}')


@dataclass
class Index:
databases: str | list = '*'
tables: str | list = '*'
index: str = ''


@dataclass
class ClickhouseSettings:
host: str = 'localhost'
Expand Down Expand Up @@ -98,6 +105,7 @@ def __init__(self):
self.debug_log_level = False
self.optimize_interval = 0
self.check_db_updated_interval = 0
self.indexes: list[Index] = []

def load(self, settings_file):
data = open(settings_file, 'r').read()
Expand All @@ -115,6 +123,11 @@ def load(self, settings_file):
self.check_db_updated_interval = data.pop(
'check_db_updated_interval', Settings.DEFAULT_CHECK_DB_UPDATED_INTERVAL,
)
indexes = data.pop('indexes', [])
for index in indexes:
self.indexes.append(
Index(**index)
)
assert isinstance(self.databases, str) or isinstance(self.databases, list)
assert isinstance(self.tables, str) or isinstance(self.tables, list)
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))
Expand Down Expand Up @@ -151,6 +164,16 @@ def validate_log_level(self):
if self.log_level == 'debug':
self.debug_log_level = True

def get_indexes(self, db_name, table_name):
results = []
for index in self.indexes:
if not self.is_pattern_matches(db_name, index.databases):
continue
if not self.is_pattern_matches(table_name, index.tables):
continue
results.append(index.index)
return results

def validate(self):
self.mysql.validate()
self.clickhouse.validate()
Expand Down
2 changes: 1 addition & 1 deletion mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
query = f'ALTER TABLE {db_name}.{table_name} RENAME COLUMN {column_name} TO {new_column_name}'
self.db_replicator.clickhouse_api.execute_command(query)

def parse_create_table_query(self, mysql_query) -> tuple:
def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]:
mysql_table_structure = self.parse_mysql_table_structure(mysql_query)
ch_table_structure = self.convert_table_structure(mysql_table_structure)
return mysql_table_structure, ch_table_structure
Expand Down
6 changes: 4 additions & 2 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ def create_initial_structure_table(self, table_name):
self.validate_mysql_structure(mysql_structure)
clickhouse_structure = self.converter.convert_table_structure(mysql_structure)
self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
self.clickhouse_api.create_table(clickhouse_structure)
indexes = self.config.get_indexes(self.database, table_name)
self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)

def prevent_binlog_removal(self):
if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL:
Expand Down Expand Up @@ -480,7 +481,8 @@ def handle_create_table_query(self, query, db_name):
if not self.config.is_table_matches(mysql_structure.table_name):
return
self.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
self.clickhouse_api.create_table(ch_structure)
indexes = self.config.get_indexes(self.database, ch_structure.table_name)
self.clickhouse_api.create_table(ch_structure, additional_indexes=indexes)

def handle_drop_table_query(self, query, db_name):
tokens = query.split()
Expand Down
17 changes: 16 additions & 1 deletion test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,22 @@ def test_runner():
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)

mysql.create_database(TEST_DB_NAME_2)
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases(), max_wait_time=5)
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())

mysql.execute(f'''
CREATE TABLE test_table_with_index (
id int NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL,
age int,
rate decimal(10,4),
PRIMARY KEY (id)
);
''')

assert_wait(lambda: 'test_table_with_index' in ch.get_tables())

create_query = ch.show_create_table('test_table_with_index')
assert 'INDEX name_idx name TYPE ngrambf_v1' in create_query

run_all_runner.stop()

Expand Down
5 changes: 5 additions & 0 deletions tests_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ databases: '*test*'
log_level: 'debug'
optimize_interval: 3
check_db_updated_interval: 3

indexes:
- databases: '*'
tables: ['test_table_with_index']
index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'
Loading