Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,24 @@ def __basic_validate_query(self, mysql_query):
if mysql_query.find(';') != -1:
raise Exception('multi-query statement not supported')
return mysql_query

def get_db_and_table_name(self, token, db_name):
if '.' in token:
db_name, table_name = token.split('.')
else:
table_name = token
db_name = strip_sql_name(db_name)
table_name = strip_sql_name(table_name)
if self.db_replicator:
if db_name == self.db_replicator.database:
db_name = self.db_replicator.target_database
matches_config = (
self.db_replicator.config.is_database_matches(db_name)
and self.db_replicator.config.is_table_matches(table_name))
else:
matches_config = True

return db_name, table_name, matches_config

def convert_alter_query(self, mysql_query, db_name):
mysql_query = self.__basic_validate_query(mysql_query)
Expand All @@ -365,21 +383,10 @@ def convert_alter_query(self, mysql_query, db_name):
if tokens[1].lower() != 'table':
raise Exception('wrong query')

table_name = tokens[2]
if table_name.find('.') != -1:
db_name, table_name = table_name.split('.')
db_name, table_name, matches_config = self.get_db_and_table_name(tokens[2], db_name)

if self.db_replicator:
if not self.db_replicator.config.is_database_matches(db_name):
return
if not self.db_replicator.config.is_table_matches(table_name):
return

db_name = strip_sql_name(db_name)
if self.db_replicator and db_name == self.db_replicator.database:
db_name = self.db_replicator.target_database

table_name = strip_sql_name(table_name)
if not matches_config:
return

subqueries = ' '.join(tokens[3:])
subqueries = split_high_level(subqueries, ',')
Expand Down
44 changes: 36 additions & 8 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,18 @@ def handle_query_event(self, event: LogEvent):
logger.debug(f'processing query event: {event.transaction_id}, query: {event.records}')
query = strip_sql_comments(event.records)
if query.lower().startswith('alter'):
self.upload_records()
self.handle_alter_query(query, event.db_name)
if query.lower().startswith('create table'):
self.handle_create_table_query(query, event.db_name)
if query.lower().startswith('drop table'):
self.upload_records()
self.handle_drop_table_query(query, event.db_name)
if query.lower().startswith('rename table'):
self.upload_records()
self.handle_rename_table_query(query, event.db_name)

def handle_alter_query(self, query, db_name):
self.upload_records()
self.converter.convert_alter_query(query, db_name)

def handle_create_table_query(self, query, db_name):
Expand All @@ -509,17 +513,41 @@ def handle_drop_table_query(self, query, db_name):
if len(tokens) != 3:
raise Exception('wrong token count', query)

table_name = tokens[2]
if '.' in table_name:
db_name, table_name = table_name.split('.')
if db_name == self.database:
db_name = self.target_database
table_name = strip_sql_name(table_name)
db_name = strip_sql_name(db_name)
db_name, table_name, matches_config = self.converter.get_db_and_table_name(tokens[2], db_name)
if not matches_config:
return

if table_name in self.state.tables_structure:
self.state.tables_structure.pop(table_name)
self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} {db_name}.{table_name}')

def handle_rename_table_query(self, query, db_name):
tokens = query.split()
if tokens[0].lower() != 'rename' or tokens[1].lower() != 'table':
raise Exception('wrong rename table query', query)

ch_clauses = []
for rename_clause in ' '.join(tokens[2:]).split(','):
tokens = rename_clause.split()

if len(tokens) != 3:
raise Exception('wrong token count', query)
if tokens[1].lower() != 'to':
raise Exception('"to" keyword expected', query)

src_db_name, src_table_name, matches_config = self.converter.get_db_and_table_name(tokens[0], db_name)
dest_db_name, dest_table_name, _ = self.converter.get_db_and_table_name(tokens[2], db_name)
if not matches_config:
return

if src_db_name != self.target_database or dest_db_name != self.target_database:
raise Exception('cross databases table renames not implemented', tokens)
if src_table_name in self.state.tables_structure:
self.state.tables_structure[dest_table_name] = self.state.tables_structure.pop(src_table_name)

ch_clauses.append(f"{src_db_name}.{src_table_name} TO {dest_db_name}.{dest_table_name}")
self.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}')

def log_stats_if_required(self):
curr_time = time.time()
if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL:
Expand Down
117 changes: 117 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,122 @@ def test_string_primary_key(monkeypatch):
binlog_replicator_runner.stop()


def test_if_exists_if_not_exists(monkeypatch):
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)

cfg = config.Settings()
cfg.load(CONFIG_FILE)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())

mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME} (id int NOT NULL, PRIMARY KEY(id));")
mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} (id int NOT NULL, PRIMARY KEY(id));")
mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));")
mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));")
mysql.execute(f"DROP TABLE IF EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME};")
mysql.execute(f"DROP TABLE IF EXISTS {TEST_TABLE_NAME};")

ch.execute_command(f'USE {TEST_DB_NAME}')

assert_wait(lambda: TEST_TABLE_NAME_2 in ch.get_tables())
assert_wait(lambda: TEST_TABLE_NAME not in ch.get_tables())

db_replicator_runner.stop()
binlog_replicator_runner.stop()


def test_percona_migration(monkeypatch):
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)

cfg = config.Settings()
cfg.load(CONFIG_FILE)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE {TEST_TABLE_NAME} (
`id` int NOT NULL,
PRIMARY KEY (`id`));
''')

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)",
commit=True,
)

binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())

ch.execute_command(f'USE {TEST_DB_NAME}')

assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

# Perform 'pt-online-schema-change' style migration to add a column
# This is a subset of what happens when the following command is run:
# pt-online-schema-change --alter "ADD COLUMN c1 INT" D=$TEST_DB_NAME,t=$TEST_TABLE_NAME,h=0.0.0.0,P=3306,u=root,p=admin --execute
mysql.execute(f'''
CREATE TABLE `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` (
`id` int NOT NULL,
PRIMARY KEY (`id`)
)''')

mysql.execute(
f"ALTER TABLE `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` ADD COLUMN c1 INT;")

mysql.execute(
f"INSERT LOW_PRIORITY IGNORE INTO `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` (`id`) SELECT `id` FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` LOCK IN SHARE MODE;",
commit=True,
)

mysql.execute(
f"RENAME TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` TO `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`, `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` TO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}`;")

mysql.execute(
f"DROP TABLE IF EXISTS `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`;")

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 1)",
commit=True,
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

db_replicator_runner.stop()
binlog_replicator_runner.stop()


def test_parse_mysql_table_structure():
query = "CREATE TABLE IF NOT EXISTS user_preferences_portal (\n\t\t\tid char(36) NOT NULL,\n\t\t\tcategory varchar(50) DEFAULT NULL,\n\t\t\tdeleted tinyint(1) DEFAULT 0,\n\t\t\tdate_entered datetime DEFAULT NULL,\n\t\t\tdate_modified datetime DEFAULT NULL,\n\t\t\tassigned_user_id char(36) DEFAULT NULL,\n\t\t\tcontents longtext DEFAULT NULL\n\t\t ) ENGINE=InnoDB DEFAULT CHARSET=utf8"

Expand All @@ -1065,3 +1181,4 @@ def test_parse_mysql_table_structure():
structure = converter.parse_mysql_table_structure(query)

assert structure.table_name == 'user_preferences_portal'

Loading