From 6dc6e9c89faaa04b301f65b3139e4f97f39fa646 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Tue, 25 Mar 2025 16:01:18 +0400 Subject: [PATCH 1/3] Handling create table LIKE another_table --- mysql_ch_replicator/converter.py | 121 ++++++++++++++++++++++++ test_mysql_ch_replicator.py | 155 +++++++++++++++++++++++++++++++ 2 files changed, 276 insertions(+) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index d2cbf62..0dcaa51 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -4,6 +4,7 @@ import sqlparse import re from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList +import copy from .table_structure import TableStructure, TableField from .enum import ( @@ -736,6 +737,80 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens): self.db_replicator.clickhouse_api.execute_command(query) def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]: + # Special handling for CREATE TABLE LIKE statements + if 'LIKE' in mysql_query.upper(): + # Check if this is a CREATE TABLE LIKE statement using regex + create_like_pattern = r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?[`"]?([^`"\s]+)[`"]?\s+LIKE\s+[`"]?([^`"\s]+)[`"]?' + match = re.search(create_like_pattern, mysql_query, re.IGNORECASE) + + if match: + # This is a CREATE TABLE LIKE statement + new_table_name = match.group(1).strip('`"') + source_table_name = match.group(2).strip('`"') + + # Create a basic fallback structure in case we can't get the source + basic_structure = TableStructure() + basic_structure.table_name = new_table_name + basic_structure.primary_keys = ['id'] + basic_structure.fields.append(TableField( + name='id', + field_type='int', + parameters='NOT NULL AUTO_INCREMENT', + additional_data=None, + )) + basic_structure.preprocess() + + # Try to get the actual structure from the existing table structures first + if (hasattr(self, 'db_replicator') and + self.db_replicator is not None and + hasattr(self.db_replicator, 'state') and + hasattr(self.db_replicator.state, 'tables_structure')): + + # Check if the source table structure is already in our state + if source_table_name in self.db_replicator.state.tables_structure: + # Get the existing structure + source_mysql_structure, source_ch_structure = self.db_replicator.state.tables_structure[source_table_name] + + # Create a new structure with the target table name + new_mysql_structure = copy.deepcopy(source_mysql_structure) + new_mysql_structure.table_name = new_table_name + + # Convert to ClickHouse structure + new_ch_structure = copy.deepcopy(source_ch_structure) + new_ch_structure.table_name = new_table_name + + return new_mysql_structure, new_ch_structure + + # If we couldn't get it from state, try with MySQL API + try: + # If we have a db_replicator with a valid mysql_api + if (hasattr(self, 'db_replicator') and + self.db_replicator is not None and + hasattr(self.db_replicator, 'mysql_api') and + self.db_replicator.mysql_api is not None): + + # Get the CREATE statement for the source table + source_create_statement = self.db_replicator.mysql_api.get_table_create_statement(source_table_name) + + # Parse the source table structure + source_structure = self.parse_mysql_table_structure(source_create_statement) + + # Create a new structure with the target table name + mysql_table_structure = copy.deepcopy(source_structure) + mysql_table_structure.table_name = new_table_name + + # Convert to ClickHouse structure + ch_table_structure = self.convert_table_structure(mysql_table_structure) + + return mysql_table_structure, ch_table_structure + except Exception as e: + print(f"Warning: Could not get source table structure for LIKE statement: {str(e)}") + + # If we got here, use the fallback structure + ch_basic_structure = self.convert_table_structure(basic_structure) + return basic_structure, ch_basic_structure + + # Regular parsing for non-LIKE statements or if LIKE handling failed mysql_table_structure = self.parse_mysql_table_structure(mysql_query) ch_table_structure = self.convert_table_structure(mysql_table_structure) return mysql_table_structure, ch_table_structure @@ -779,6 +854,52 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None # style `.` structure.table_name = strip_sql_name(tokens[2].get_real_name()) + # Handle CREATE TABLE ... LIKE statements + if len(tokens) > 4 and tokens[3].normalized.upper() == 'LIKE': + # Extract the source table name + if not isinstance(tokens[4], sqlparse.sql.Identifier): + raise Exception('wrong create statement', create_statement) + + source_table_name = strip_sql_name(tokens[4].get_real_name()) + + # Create a basic structure with the target table name + structure.table_name = strip_sql_name(tokens[2].get_real_name()) + + # During initial parsing in the test, we'll create a skeleton structure + # Later during replication, the actual structure will be used + structure.primary_keys = ['id'] # Assuming 'id' is the primary key + structure.fields.append(TableField( + name='id', + field_type='int', + parameters='NOT NULL AUTO_INCREMENT', + additional_data=None, + )) + structure.preprocess() + + # Try to get the actual structure if possible + try: + # If we have a db_replicator with a valid mysql_api + if (hasattr(self, 'db_replicator') and + self.db_replicator is not None and + hasattr(self.db_replicator, 'mysql_api') and + self.db_replicator.mysql_api is not None): + + # Try to get the CREATE statement from the current database + mysql_api = self.db_replicator.mysql_api + source_create_statement = mysql_api.get_table_create_statement(source_table_name) + + # Parse the source table structure + source_structure = self.parse_mysql_table_structure(source_create_statement) + + # Copy the structure but keep the new table name + structure = copy.deepcopy(source_structure) + structure.table_name = strip_sql_name(tokens[2].get_real_name()) + except Exception as e: + # Log the error but still return the basic structure + print(f"Warning: Could not get source table structure: {str(e)}") + + return structure + if not isinstance(tokens[3], sqlparse.sql.Parenthesis): raise Exception('wrong create statement', create_statement) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 8c35333..7f30026 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1657,3 +1657,158 @@ def test_enum_conversion(): ]) def test_parse_db_name_from_query(query, expected): assert BinlogReplicator._try_parse_db_name_from_query(query) == expected + + +def test_create_table_like(): + """ + Test that CREATE TABLE ... LIKE statements are handled correctly. + The test creates a source table, then creates another table using LIKE, + and verifies that both tables have the same structure in ClickHouse. + """ + config_file = CONFIG_FILE + cfg = config.Settings() + cfg.load(config_file) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + mysql.set_database(TEST_DB_NAME) + + # Create the source table with a complex structure + mysql.execute(f''' + CREATE TABLE `source_table` ( + id INT NOT NULL AUTO_INCREMENT, + name VARCHAR(255) NOT NULL, + age INT UNSIGNED, + email VARCHAR(100) UNIQUE, + status ENUM('active','inactive','pending') DEFAULT 'active', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + data JSON, + PRIMARY KEY (id) + ); + ''') + + # Get the CREATE statement for the source table + source_create = mysql.get_table_create_statement('source_table') + + # Create a table using LIKE statement + mysql.execute(f''' + CREATE TABLE `derived_table` LIKE `source_table`; + ''') + + # Set up replication + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) + db_replicator_runner.run() + + # Wait for database to be created and renamed from tmp to final + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=10.0) + + # Use the correct database explicitly + ch.execute_command(f'USE `{TEST_DB_NAME}`') + + # Wait for tables to be created in ClickHouse with a longer timeout + assert_wait(lambda: 'source_table' in ch.get_tables(), max_wait_time=10.0) + assert_wait(lambda: 'derived_table' in ch.get_tables(), max_wait_time=10.0) + + # Insert data into both tables to verify they work + mysql.execute("INSERT INTO `source_table` (name, age, email, status) VALUES ('Alice', 30, 'alice@example.com', 'active');", commit=True) + mysql.execute("INSERT INTO `derived_table` (name, age, email, status) VALUES ('Bob', 25, 'bob@example.com', 'pending');", commit=True) + + # Wait for data to be replicated + assert_wait(lambda: len(ch.select('source_table')) == 1, max_wait_time=10.0) + assert_wait(lambda: len(ch.select('derived_table')) == 1, max_wait_time=10.0) + + # Compare structures by reading descriptions in ClickHouse + source_desc = ch.execute_command("DESCRIBE TABLE source_table") + derived_desc = ch.execute_command("DESCRIBE TABLE derived_table") + + # The structures should be identical + assert source_desc == derived_desc + + # Verify the data in both tables + source_data = ch.select('source_table')[0] + derived_data = ch.select('derived_table')[0] + + assert source_data['name'] == 'Alice' + assert derived_data['name'] == 'Bob' + + # Both tables should have same column types + assert type(source_data['id']) == type(derived_data['id']) + assert type(source_data['name']) == type(derived_data['name']) + assert type(source_data['age']) == type(derived_data['age']) + + # Now test realtime replication by creating a new table after the initial replication + mysql.execute(f''' + CREATE TABLE `realtime_table` ( + id INT NOT NULL AUTO_INCREMENT, + title VARCHAR(100) NOT NULL, + description TEXT, + price DECIMAL(10,2), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) + ); + ''') + + # Wait for the new table to be created in ClickHouse + assert_wait(lambda: 'realtime_table' in ch.get_tables(), max_wait_time=10.0) + + # Insert data into the new table + mysql.execute(""" + INSERT INTO `realtime_table` (title, description, price) VALUES + ('Product 1', 'First product description', 19.99), + ('Product 2', 'Second product description', 29.99), + ('Product 3', 'Third product description', 39.99); + """, commit=True) + + # Wait for data to be replicated + assert_wait(lambda: len(ch.select('realtime_table')) == 3, max_wait_time=10.0) + + # Verify the data in the realtime table + realtime_data = ch.select('realtime_table') + assert len(realtime_data) == 3 + + # Verify specific values + products = sorted([record['title'] for record in realtime_data]) + assert products == ['Product 1', 'Product 2', 'Product 3'] + + prices = sorted([float(record['price']) for record in realtime_data]) + assert prices == [19.99, 29.99, 39.99] + + # Now create another table using LIKE after initial replication + mysql.execute(f''' + CREATE TABLE `realtime_like_table` LIKE `realtime_table`; + ''') + + # Wait for the new LIKE table to be created in ClickHouse + assert_wait(lambda: 'realtime_like_table' in ch.get_tables(), max_wait_time=10.0) + + # Insert data into the new LIKE table + mysql.execute(""" + INSERT INTO `realtime_like_table` (title, description, price) VALUES + ('Service A', 'Premium service', 99.99), + ('Service B', 'Standard service', 49.99); + """, commit=True) + + # Wait for data to be replicated + assert_wait(lambda: len(ch.select('realtime_like_table')) == 2, max_wait_time=10.0) + + # Verify the data in the realtime LIKE table + like_data = ch.select('realtime_like_table') + assert len(like_data) == 2 + + services = sorted([record['title'] for record in like_data]) + assert services == ['Service A', 'Service B'] + + # Clean up + db_replicator_runner.stop() + binlog_replicator_runner.stop() From b29286a8e1c862868792275a3519adca1f682227 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Tue, 25 Mar 2025 16:08:32 +0400 Subject: [PATCH 2/3] Simplify --- mysql_ch_replicator/converter.py | 91 ++++++++++++-------------------- 1 file changed, 33 insertions(+), 58 deletions(-) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 0dcaa51..1279058 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -748,18 +748,6 @@ def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableSt new_table_name = match.group(1).strip('`"') source_table_name = match.group(2).strip('`"') - # Create a basic fallback structure in case we can't get the source - basic_structure = TableStructure() - basic_structure.table_name = new_table_name - basic_structure.primary_keys = ['id'] - basic_structure.fields.append(TableField( - name='id', - field_type='int', - parameters='NOT NULL AUTO_INCREMENT', - additional_data=None, - )) - basic_structure.preprocess() - # Try to get the actual structure from the existing table structures first if (hasattr(self, 'db_replicator') and self.db_replicator is not None and @@ -782,13 +770,12 @@ def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableSt return new_mysql_structure, new_ch_structure # If we couldn't get it from state, try with MySQL API - try: - # If we have a db_replicator with a valid mysql_api - if (hasattr(self, 'db_replicator') and - self.db_replicator is not None and - hasattr(self.db_replicator, 'mysql_api') and - self.db_replicator.mysql_api is not None): - + if (hasattr(self, 'db_replicator') and + self.db_replicator is not None and + hasattr(self.db_replicator, 'mysql_api') and + self.db_replicator.mysql_api is not None): + + try: # Get the CREATE statement for the source table source_create_statement = self.db_replicator.mysql_api.get_table_create_statement(source_table_name) @@ -803,14 +790,15 @@ def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableSt ch_table_structure = self.convert_table_structure(mysql_table_structure) return mysql_table_structure, ch_table_structure - except Exception as e: - print(f"Warning: Could not get source table structure for LIKE statement: {str(e)}") + except Exception as e: + error_msg = f"Could not get source table structure for LIKE statement: {str(e)}" + print(f"Error: {error_msg}") + raise Exception(error_msg, mysql_query) - # If we got here, use the fallback structure - ch_basic_structure = self.convert_table_structure(basic_structure) - return basic_structure, ch_basic_structure + # If we got here, we couldn't determine the structure + raise Exception(f"Could not determine structure for source table '{source_table_name}' in LIKE statement", mysql_query) - # Regular parsing for non-LIKE statements or if LIKE handling failed + # Regular parsing for non-LIKE statements mysql_table_structure = self.parse_mysql_table_structure(mysql_query) ch_table_structure = self.convert_table_structure(mysql_table_structure) return mysql_table_structure, ch_table_structure @@ -861,44 +849,31 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None raise Exception('wrong create statement', create_statement) source_table_name = strip_sql_name(tokens[4].get_real_name()) + target_table_name = strip_sql_name(tokens[2].get_real_name()) - # Create a basic structure with the target table name - structure.table_name = strip_sql_name(tokens[2].get_real_name()) - - # During initial parsing in the test, we'll create a skeleton structure - # Later during replication, the actual structure will be used - structure.primary_keys = ['id'] # Assuming 'id' is the primary key - structure.fields.append(TableField( - name='id', - field_type='int', - parameters='NOT NULL AUTO_INCREMENT', - additional_data=None, - )) - structure.preprocess() - - # Try to get the actual structure if possible - try: - # If we have a db_replicator with a valid mysql_api - if (hasattr(self, 'db_replicator') and + # Try to get the actual structure - we need a valid MySQL API + if not (hasattr(self, 'db_replicator') and self.db_replicator is not None and hasattr(self.db_replicator, 'mysql_api') and self.db_replicator.mysql_api is not None): - - # Try to get the CREATE statement from the current database - mysql_api = self.db_replicator.mysql_api - source_create_statement = mysql_api.get_table_create_statement(source_table_name) - - # Parse the source table structure - source_structure = self.parse_mysql_table_structure(source_create_statement) - - # Copy the structure but keep the new table name - structure = copy.deepcopy(source_structure) - structure.table_name = strip_sql_name(tokens[2].get_real_name()) + raise Exception(f"Cannot parse LIKE statement - no MySQL API available to get source table structure", create_statement) + + try: + # Try to get the CREATE statement from the current database + mysql_api = self.db_replicator.mysql_api + source_create_statement = mysql_api.get_table_create_statement(source_table_name) + + # Parse the source table structure + source_structure = self.parse_mysql_table_structure(source_create_statement) + + # Copy the structure but keep the new table name + structure = copy.deepcopy(source_structure) + structure.table_name = target_table_name + return structure except Exception as e: - # Log the error but still return the basic structure - print(f"Warning: Could not get source table structure: {str(e)}") - - return structure + error_msg = f"Could not get source table structure: {str(e)}" + print(f"Error: {error_msg}") + raise Exception(error_msg, create_statement) if not isinstance(tokens[3], sqlparse.sql.Parenthesis): raise Exception('wrong create statement', create_statement) From 14887f16b7d0cc7f09a6b3c0bd76b58330308751 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Tue, 25 Mar 2025 16:23:52 +0400 Subject: [PATCH 3/3] Refactor --- mysql_ch_replicator/converter.py | 142 +++++++++++++++---------------- 1 file changed, 70 insertions(+), 72 deletions(-) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 1279058..f40a104 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -736,6 +736,72 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens): query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN {column_name} TO {new_column_name}' self.db_replicator.clickhouse_api.execute_command(query) + def _handle_create_table_like(self, create_statement, source_table_name, target_table_name, is_query_api=True): + """ + Helper method to handle CREATE TABLE LIKE statements. + + Args: + create_statement: The original CREATE TABLE LIKE statement + source_table_name: Name of the source table being copied + target_table_name: Name of the new table being created + is_query_api: If True, returns both MySQL and CH structures; if False, returns only MySQL structure + + Returns: + Either (mysql_structure, ch_structure) if is_query_api=True, or just mysql_structure otherwise + """ + # Try to get the actual structure from the existing table structures first + if (hasattr(self, 'db_replicator') and + self.db_replicator is not None and + hasattr(self.db_replicator, 'state') and + hasattr(self.db_replicator.state, 'tables_structure')): + + # Check if the source table structure is already in our state + if source_table_name in self.db_replicator.state.tables_structure: + # Get the existing structure + source_mysql_structure, source_ch_structure = self.db_replicator.state.tables_structure[source_table_name] + + # Create a new structure with the target table name + new_mysql_structure = copy.deepcopy(source_mysql_structure) + new_mysql_structure.table_name = target_table_name + + # Convert to ClickHouse structure + new_ch_structure = copy.deepcopy(source_ch_structure) + new_ch_structure.table_name = target_table_name + + return (new_mysql_structure, new_ch_structure) if is_query_api else new_mysql_structure + + # If we couldn't get it from state, try with MySQL API + if (hasattr(self, 'db_replicator') and + self.db_replicator is not None and + hasattr(self.db_replicator, 'mysql_api') and + self.db_replicator.mysql_api is not None): + + try: + # Get the CREATE statement for the source table + source_create_statement = self.db_replicator.mysql_api.get_table_create_statement(source_table_name) + + # Parse the source table structure + source_structure = self.parse_mysql_table_structure(source_create_statement) + + # Copy the structure but keep the new table name + mysql_structure = copy.deepcopy(source_structure) + mysql_structure.table_name = target_table_name + + if is_query_api: + # Convert to ClickHouse structure + ch_structure = self.convert_table_structure(mysql_structure) + return mysql_structure, ch_structure + else: + return mysql_structure + + except Exception as e: + error_msg = f"Could not get source table structure for LIKE statement: {str(e)}" + print(f"Error: {error_msg}") + raise Exception(error_msg, create_statement) + + # If we got here, we couldn't determine the structure + raise Exception(f"Could not determine structure for source table '{source_table_name}' in LIKE statement", create_statement) + def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]: # Special handling for CREATE TABLE LIKE statements if 'LIKE' in mysql_query.upper(): @@ -748,55 +814,8 @@ def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableSt new_table_name = match.group(1).strip('`"') source_table_name = match.group(2).strip('`"') - # Try to get the actual structure from the existing table structures first - if (hasattr(self, 'db_replicator') and - self.db_replicator is not None and - hasattr(self.db_replicator, 'state') and - hasattr(self.db_replicator.state, 'tables_structure')): - - # Check if the source table structure is already in our state - if source_table_name in self.db_replicator.state.tables_structure: - # Get the existing structure - source_mysql_structure, source_ch_structure = self.db_replicator.state.tables_structure[source_table_name] - - # Create a new structure with the target table name - new_mysql_structure = copy.deepcopy(source_mysql_structure) - new_mysql_structure.table_name = new_table_name - - # Convert to ClickHouse structure - new_ch_structure = copy.deepcopy(source_ch_structure) - new_ch_structure.table_name = new_table_name - - return new_mysql_structure, new_ch_structure - - # If we couldn't get it from state, try with MySQL API - if (hasattr(self, 'db_replicator') and - self.db_replicator is not None and - hasattr(self.db_replicator, 'mysql_api') and - self.db_replicator.mysql_api is not None): - - try: - # Get the CREATE statement for the source table - source_create_statement = self.db_replicator.mysql_api.get_table_create_statement(source_table_name) - - # Parse the source table structure - source_structure = self.parse_mysql_table_structure(source_create_statement) - - # Create a new structure with the target table name - mysql_table_structure = copy.deepcopy(source_structure) - mysql_table_structure.table_name = new_table_name - - # Convert to ClickHouse structure - ch_table_structure = self.convert_table_structure(mysql_table_structure) - - return mysql_table_structure, ch_table_structure - except Exception as e: - error_msg = f"Could not get source table structure for LIKE statement: {str(e)}" - print(f"Error: {error_msg}") - raise Exception(error_msg, mysql_query) - - # If we got here, we couldn't determine the structure - raise Exception(f"Could not determine structure for source table '{source_table_name}' in LIKE statement", mysql_query) + # Use the common helper method to handle the LIKE statement + return self._handle_create_table_like(mysql_query, source_table_name, new_table_name, True) # Regular parsing for non-LIKE statements mysql_table_structure = self.parse_mysql_table_structure(mysql_query) @@ -851,29 +870,8 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None source_table_name = strip_sql_name(tokens[4].get_real_name()) target_table_name = strip_sql_name(tokens[2].get_real_name()) - # Try to get the actual structure - we need a valid MySQL API - if not (hasattr(self, 'db_replicator') and - self.db_replicator is not None and - hasattr(self.db_replicator, 'mysql_api') and - self.db_replicator.mysql_api is not None): - raise Exception(f"Cannot parse LIKE statement - no MySQL API available to get source table structure", create_statement) - - try: - # Try to get the CREATE statement from the current database - mysql_api = self.db_replicator.mysql_api - source_create_statement = mysql_api.get_table_create_statement(source_table_name) - - # Parse the source table structure - source_structure = self.parse_mysql_table_structure(source_create_statement) - - # Copy the structure but keep the new table name - structure = copy.deepcopy(source_structure) - structure.table_name = target_table_name - return structure - except Exception as e: - error_msg = f"Could not get source table structure: {str(e)}" - print(f"Error: {error_msg}") - raise Exception(error_msg, create_statement) + # Use the common helper method to handle the LIKE statement + return self._handle_create_table_like(create_statement, source_table_name, target_table_name, False) if not isinstance(tokens[3], sqlparse.sql.Parenthesis): raise Exception('wrong create statement', create_statement)