Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def insert(self, table_name, records, table_structure: TableStructure = None):

def erase(self, table_name, field_name, field_values):
field_name = ','.join(field_name)
field_values = ', '.join(list(map(str, field_values)))
field_values = ', '.join(f'({v})' for v in field_values)
query = DELETE_QUERY.format(**{
'db_name': self.database,
'table_name': table_name,
Expand Down
63 changes: 63 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,69 @@ def test_runner():
run_all_runner.stop()


def read_logs(db_name):
return open(os.path.join('binlog', db_name, 'db_replicator.log')).read()


def test_multi_column_erase():
config_file = CONFIG_FILE

cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

mysql.drop_database(TEST_DB_NAME_2)
ch.drop_database(TEST_DB_NAME_2)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE {TEST_TABLE_NAME} (
departments int(11) NOT NULL,
termine int(11) NOT NULL,
PRIMARY KEY (departments,termine)
)
''')


mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (10, 20);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (30, 40);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (50, 60);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (20, 10);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (40, 30);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (60, 50);", commit=True)

run_all_runner = RunAllRunner(cfg_file=config_file)
run_all_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())

ch.execute_command(f'USE {TEST_DB_NAME}')

assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6)

mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=10;", commit=True)
mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=30;", commit=True)
mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=50;", commit=True)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)

run_all_runner.stop()

assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
assert('Traceback' not in read_logs(TEST_DB_NAME))


def test_initial_only():
cfg = config.Settings()
cfg.load(CONFIG_FILE)
Expand Down
2 changes: 2 additions & 0 deletions tests_config_mariadb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ binlog_replicator:

databases: '*test*'
log_level: 'debug'
optimize_interval: 3
check_db_updated_interval: 3
Loading