Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ tables: '*'
exclude_databases: ['database_10', 'database_*_42'] # optional
exclude_tables: ['meta_table_*'] # optional

target_databases: # optional
source_db_in_mysql_1: destination_db_in_clickhouse_1
source_db_in_mysql_2: destination_db_in_clickhouse_2
...

log_level: 'info' # optional
optimize_interval: 86400 # optional
auto_restart_interval: 3600 # optional
Expand All @@ -183,6 +188,7 @@ http_port: 9128 # optional
- `tables` - tables to filter, list is also supported
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
- `target_databases` - if you want database in ClickHouse to have different name from MySQL database
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
- `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage.
Expand Down
4 changes: 4 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self):
self.auto_restart_interval = 0
self.http_host = ''
self.http_port = 0
self.target_databases = {}

def load(self, settings_file):
data = open(settings_file, 'r').read()
Expand All @@ -132,6 +133,7 @@ def load(self, settings_file):
)
self.http_host = data.pop('http_host', '')
self.http_port = data.pop('http_port', 0)
self.target_databases = data.pop('target_databases', {})

indexes = data.pop('indexes', [])
for index in indexes:
Expand Down Expand Up @@ -189,3 +191,5 @@ def validate(self):
self.clickhouse.validate()
self.binlog_replicator.validate()
self.validate_log_level()
if not isinstance(self.target_databases, dict):
raise ValueError(f'wrong target databases {self.target_databases}')
15 changes: 14 additions & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def save(self):
'tables_structure': self.tables_structure,
'tables': self.tables,
'pid': os.getpid(),
'save_time': time.time(),
})
with open(file_name + '.tmp', 'wb') as f:
f.write(data)
Expand Down Expand Up @@ -108,7 +109,19 @@ class DbReplicator:
def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False):
self.config = config
self.database = database
self.target_database = target_database or database

# use same as source database by default
self.target_database = database

# use target database from config file if exists
target_database_from_config = config.target_databases.get(database)
if target_database_from_config:
self.target_database = target_database_from_config

# use command line argument if exists
if target_database:
self.target_database = target_database

self.target_database_tmp = self.target_database + '_tmp'
self.initial_only = initial_only

Expand Down
104 changes: 95 additions & 9 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml'
TEST_DB_NAME = 'replication-test_db'
TEST_DB_NAME_2 = 'replication-test_db_2'
TEST_DB_NAME_2_DESTINATION = 'replication-destination'

TEST_TABLE_NAME = 'test_table'
TEST_TABLE_NAME_2 = 'test_table_2'
TEST_TABLE_NAME_3 = 'test_table_3'
Expand Down Expand Up @@ -100,18 +102,18 @@ def test_e2e_regular(config_file):
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(255) COMMENT 'Dân tộc, ví dụ: Kinh',
`age x` int COMMENT 'CMND Cũ',
age int COMMENT 'CMND Cũ',
field1 text,
field2 blob,
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');",
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');",
commit=True,
)
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`) VALUES ('Peter', 33);", commit=True)
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Peter', 33);", commit=True)

binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()
Expand All @@ -125,13 +127,13 @@ def test_e2e_regular(config_file):
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`) VALUES ('Filipp', 50);", commit=True)
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age x'] == 50)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50)


mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD `last_name` varchar(255); ")
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, last_name) VALUES ('Mary', 24, 'Smith');", commit=True)
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name) VALUES ('Mary', 24, 'Smith');", commit=True)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0]['last_name'] == 'Smith')
Expand All @@ -146,7 +148,7 @@ def test_e2e_regular(config_file):
)

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, last_name, country) "
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name, country) "
f"VALUES ('John', 12, 'Doe', 'USA');", commit=True,
)

Expand Down Expand Up @@ -314,6 +316,7 @@ def test_runner():

mysql.drop_database(TEST_DB_NAME_2)
ch.drop_database(TEST_DB_NAME_2)
ch.drop_database(TEST_DB_NAME_2_DESTINATION)

prepare_env(cfg, mysql, ch)

Expand Down Expand Up @@ -416,7 +419,7 @@ def test_runner():
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo')

mysql.create_database(TEST_DB_NAME_2)
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())
assert_wait(lambda: TEST_DB_NAME_2_DESTINATION in ch.get_databases())

mysql.execute(f'''
CREATE TABLE `group` (
Expand Down Expand Up @@ -457,7 +460,7 @@ def test_multi_column_erase():
)

mysql.drop_database(TEST_DB_NAME_2)
ch.drop_database(TEST_DB_NAME_2)
ch.drop_database(TEST_DB_NAME_2_DESTINATION)

prepare_env(cfg, mysql, ch)

Expand Down Expand Up @@ -709,6 +712,89 @@ def test_datetime_exception():
binlog_replicator_runner.stop()


def test_performance():
config_file = 'tests_config_perf.yaml'
num_records = 100000

cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(2048),
age int,
PRIMARY KEY (id)
);
''')

binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()

time.sleep(1)

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True)

def _get_last_insert_name():
record = get_last_insert_from_binlog(cfg=cfg, db_name=TEST_DB_NAME)
if record is None:
return None
return record[1].decode('utf-8')

assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5)

binlog_replicator_runner.stop()

time.sleep(1)

print("populating mysql data")

base_value = 'a' * 2000

for i in range(num_records):
if i % 2000 == 0:
print(f'populated {i} elements')
mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
)

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)

print("running db_replicator")
t1 = time.time()
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()

assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_FINAL', retry_interval=0.5, max_wait_time=1000)
t2 = time.time()

binlog_replicator_runner.stop()

time_delta = t2 - t1
rps = num_records / time_delta

print('\n\n')
print("*****************************")
print("records per second:", int(rps))
print("total time (seconds):", round(time_delta, 2))
print("*****************************")
print('\n\n')



def test_different_types_1():
cfg = config.Settings()
cfg.load(CONFIG_FILE)
Expand Down
3 changes: 3 additions & 0 deletions tests_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ log_level: 'debug'
optimize_interval: 3
check_db_updated_interval: 3

target_databases:
replication-test_db_2: replication-destination

indexes:
- databases: '*'
tables: ['group']
Expand Down
Loading