diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 99db0d1..af4088f 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -354,6 +354,24 @@ def __basic_validate_query(self, mysql_query): if mysql_query.find(';') != -1: raise Exception('multi-query statement not supported') return mysql_query + + def get_db_and_table_name(self, token, db_name): + if '.' in token: + db_name, table_name = token.split('.') + else: + table_name = token + db_name = strip_sql_name(db_name) + table_name = strip_sql_name(table_name) + if self.db_replicator: + if db_name == self.db_replicator.database: + db_name = self.db_replicator.target_database + matches_config = ( + self.db_replicator.config.is_database_matches(db_name) + and self.db_replicator.config.is_table_matches(table_name)) + else: + matches_config = True + + return db_name, table_name, matches_config def convert_alter_query(self, mysql_query, db_name): mysql_query = self.__basic_validate_query(mysql_query) @@ -365,21 +383,10 @@ def convert_alter_query(self, mysql_query, db_name): if tokens[1].lower() != 'table': raise Exception('wrong query') - table_name = tokens[2] - if table_name.find('.') != -1: - db_name, table_name = table_name.split('.') + db_name, table_name, matches_config = self.get_db_and_table_name(tokens[2], db_name) - if self.db_replicator: - if not self.db_replicator.config.is_database_matches(db_name): - return - if not self.db_replicator.config.is_table_matches(table_name): - return - - db_name = strip_sql_name(db_name) - if self.db_replicator and db_name == self.db_replicator.database: - db_name = self.db_replicator.target_database - - table_name = strip_sql_name(table_name) + if not matches_config: + return subqueries = ' '.join(tokens[3:]) subqueries = split_high_level(subqueries, ',') diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 1250fc3..2fb7528 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -477,14 +477,18 @@ def handle_query_event(self, event: LogEvent): logger.debug(f'processing query event: {event.transaction_id}, query: {event.records}') query = strip_sql_comments(event.records) if query.lower().startswith('alter'): + self.upload_records() self.handle_alter_query(query, event.db_name) if query.lower().startswith('create table'): self.handle_create_table_query(query, event.db_name) if query.lower().startswith('drop table'): + self.upload_records() self.handle_drop_table_query(query, event.db_name) + if query.lower().startswith('rename table'): + self.upload_records() + self.handle_rename_table_query(query, event.db_name) def handle_alter_query(self, query, db_name): - self.upload_records() self.converter.convert_alter_query(query, db_name) def handle_create_table_query(self, query, db_name): @@ -509,17 +513,41 @@ def handle_drop_table_query(self, query, db_name): if len(tokens) != 3: raise Exception('wrong token count', query) - table_name = tokens[2] - if '.' in table_name: - db_name, table_name = table_name.split('.') - if db_name == self.database: - db_name = self.target_database - table_name = strip_sql_name(table_name) - db_name = strip_sql_name(db_name) + db_name, table_name, matches_config = self.converter.get_db_and_table_name(tokens[2], db_name) + if not matches_config: + return + if table_name in self.state.tables_structure: self.state.tables_structure.pop(table_name) self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} {db_name}.{table_name}') + def handle_rename_table_query(self, query, db_name): + tokens = query.split() + if tokens[0].lower() != 'rename' or tokens[1].lower() != 'table': + raise Exception('wrong rename table query', query) + + ch_clauses = [] + for rename_clause in ' '.join(tokens[2:]).split(','): + tokens = rename_clause.split() + + if len(tokens) != 3: + raise Exception('wrong token count', query) + if tokens[1].lower() != 'to': + raise Exception('"to" keyword expected', query) + + src_db_name, src_table_name, matches_config = self.converter.get_db_and_table_name(tokens[0], db_name) + dest_db_name, dest_table_name, _ = self.converter.get_db_and_table_name(tokens[2], db_name) + if not matches_config: + return + + if src_db_name != self.target_database or dest_db_name != self.target_database: + raise Exception('cross databases table renames not implemented', tokens) + if src_table_name in self.state.tables_structure: + self.state.tables_structure[dest_table_name] = self.state.tables_structure.pop(src_table_name) + + ch_clauses.append(f"{src_db_name}.{src_table_name} TO {dest_db_name}.{dest_table_name}") + self.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}') + def log_stats_if_required(self): curr_time = time.time() if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 69c14aa..efa1c4e 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1057,6 +1057,122 @@ def test_string_primary_key(monkeypatch): binlog_replicator_runner.stop() +def test_if_exists_if_not_exists(monkeypatch): + monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1) + + cfg = config.Settings() + cfg.load(CONFIG_FILE) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + binlog_replicator_runner = BinlogReplicatorRunner() + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) + db_replicator_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + + mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME} (id int NOT NULL, PRIMARY KEY(id));") + mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} (id int NOT NULL, PRIMARY KEY(id));") + mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));") + mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));") + mysql.execute(f"DROP TABLE IF EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME};") + mysql.execute(f"DROP TABLE IF EXISTS {TEST_TABLE_NAME};") + + ch.execute_command(f'USE {TEST_DB_NAME}') + + assert_wait(lambda: TEST_TABLE_NAME_2 in ch.get_tables()) + assert_wait(lambda: TEST_TABLE_NAME not in ch.get_tables()) + + db_replicator_runner.stop() + binlog_replicator_runner.stop() + + +def test_percona_migration(monkeypatch): + monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1) + + cfg = config.Settings() + cfg.load(CONFIG_FILE) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' +CREATE TABLE {TEST_TABLE_NAME} ( + `id` int NOT NULL, + PRIMARY KEY (`id`)); + ''') + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)", + commit=True, + ) + + binlog_replicator_runner = BinlogReplicatorRunner() + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) + db_replicator_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + + ch.execute_command(f'USE {TEST_DB_NAME}') + + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + + # Perform 'pt-online-schema-change' style migration to add a column + # This is a subset of what happens when the following command is run: + # pt-online-schema-change --alter "ADD COLUMN c1 INT" D=$TEST_DB_NAME,t=$TEST_TABLE_NAME,h=0.0.0.0,P=3306,u=root,p=admin --execute + mysql.execute(f''' +CREATE TABLE `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` ( + `id` int NOT NULL, + PRIMARY KEY (`id`) +)''') + + mysql.execute( + f"ALTER TABLE `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` ADD COLUMN c1 INT;") + + mysql.execute( + f"INSERT LOW_PRIORITY IGNORE INTO `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` (`id`) SELECT `id` FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` LOCK IN SHARE MODE;", + commit=True, + ) + + mysql.execute( + f"RENAME TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` TO `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`, `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` TO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}`;") + + mysql.execute( + f"DROP TABLE IF EXISTS `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`;") + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 1)", + commit=True, + ) + + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) + + db_replicator_runner.stop() + binlog_replicator_runner.stop() + + def test_parse_mysql_table_structure(): query = "CREATE TABLE IF NOT EXISTS user_preferences_portal (\n\t\t\tid char(36) NOT NULL,\n\t\t\tcategory varchar(50) DEFAULT NULL,\n\t\t\tdeleted tinyint(1) DEFAULT 0,\n\t\t\tdate_entered datetime DEFAULT NULL,\n\t\t\tdate_modified datetime DEFAULT NULL,\n\t\t\tassigned_user_id char(36) DEFAULT NULL,\n\t\t\tcontents longtext DEFAULT NULL\n\t\t ) ENGINE=InnoDB DEFAULT CHARSET=utf8" @@ -1065,3 +1181,4 @@ def test_parse_mysql_table_structure(): structure = converter.parse_mysql_table_structure(query) assert structure.table_name == 'user_preferences_portal' +