Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,24 @@ binlog_replicator:
databases: 'database_name_pattern_*'
tables: '*'

exclude_databases: ['database_10', 'database_*_42'] # optional
exclude_tables: ['meta_table_*'] # optional

log_level: 'info' # optional
```

#### Required settings

- `mysql` MySQL connection settings
- `clickhouse` ClickHouse connection settings
- `binlog_replicator.data_dir` Create a new empty directory, it will be used by script to store it's state
- `databases` Databases name pattern to replicate, e.g. `db_*` will match `db_1` `db_2` `db_test`, list is also supported
- `tables` (__optional__) - tables to filter, list is also supported
- `log_level` (__optional__) - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)

#### Optional settings
- `tables` - tables to filter, list is also supported
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)

Few more tables / dbs examples:

Expand Down
8 changes: 8 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def __init__(self):
self.binlog_replicator = BinlogReplicatorSettings()
self.databases = ''
self.tables = '*'
self.exclude_databases = ''
self.exclude_tables = ''
self.settings_file = ''
self.log_level = 'info'
self.debug_log_level = False
Expand All @@ -101,6 +103,8 @@ def load(self, settings_file):
self.clickhouse = ClickhouseSettings(**data['clickhouse'])
self.databases = data['databases']
self.tables = data.get('tables', '*')
self.exclude_databases = data.get('exclude_databases', '')
self.exclude_tables = data.get('exclude_tables', '')
self.log_level = data.get('log_level', 'info')
assert isinstance(self.databases, str) or isinstance(self.databases, list)
assert isinstance(self.tables, str) or isinstance(self.tables, list)
Expand All @@ -121,9 +125,13 @@ def is_pattern_matches(cls, substr, pattern):
raise ValueError()

def is_database_matches(self, db_name):
if self.exclude_databases and self.is_pattern_matches(db_name, self.exclude_databases):
return False
return self.is_pattern_matches(db_name, self.databases)

def is_table_matches(self, table_name):
if self.exclude_tables and self.is_pattern_matches(table_name, self.exclude_tables):
return False
return self.is_pattern_matches(table_name, self.tables)

def validate_log_level(self):
Expand Down
38 changes: 38 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,43 @@ def test_database_tables_filtering():
)

mysql.drop_database('test_db_3')
mysql.drop_database('test_db_12')

mysql.create_database('test_db_3')
mysql.create_database('test_db_12')

ch.drop_database('test_db_3')
ch.drop_database('test_db_12')

prepare_env(cfg, mysql, ch, db_name='test_db_2')

mysql.execute(f'''
CREATE TABLE test_table_15 (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
PRIMARY KEY (id)
);
''')

mysql.execute(f'''
CREATE TABLE test_table_142 (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
PRIMARY KEY (id)
);
''')

mysql.execute(f'''
CREATE TABLE test_table_143 (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
PRIMARY KEY (id)
);
''')

mysql.execute(f'''
CREATE TABLE test_table_3 (
id int NOT NULL AUTO_INCREMENT,
Expand All @@ -437,14 +469,20 @@ def test_database_tables_filtering():

assert_wait(lambda: 'test_db_2' in ch.get_databases())
assert 'test_db_3' not in ch.get_databases()
assert 'test_db_12' not in ch.get_databases()

ch.execute_command('USE test_db_2')

assert_wait(lambda: 'test_table_2' in ch.get_tables())
assert_wait(lambda: len(ch.select('test_table_2')) == 1)

assert_wait(lambda: 'test_table_143' in ch.get_tables())

assert 'test_table_3' not in ch.get_tables()

assert 'test_table_15' not in ch.get_tables()
assert 'test_table_142' not in ch.get_tables()


def test_datetime_exception():
cfg = config.Settings()
Expand Down
3 changes: 3 additions & 0 deletions tests_config_databases_tables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ binlog_replicator:
databases: ['test_db_1*', 'test_db_2']
tables: ['test_table_1*', 'test_table_2']

exclude_databases: ['test_db_12']
exclude_tables: ['test_table_15', 'test_table_*42']

log_level: 'debug'
Loading