diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 53cc1e7..1f17e72 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -6,7 +6,11 @@ from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList from .table_structure import TableStructure, TableField -from .converter_enum_parser import parse_mysql_enum +from .enum import ( + parse_mysql_enum, EnumConverter, + parse_enum_or_set_field, + extract_enum_or_set_values +) CHARSET_MYSQL_TO_PYTHON = { @@ -282,7 +286,7 @@ def convert_type(self, mysql_type, parameters): enum_values = parse_mysql_enum(mysql_type) ch_enum_values = [] for idx, value_name in enumerate(enum_values): - ch_enum_values.append(f"'{value_name}' = {idx+1}") + ch_enum_values.append(f"'{value_name.lower()}' = {idx+1}") ch_enum_values = ', '.join(ch_enum_values) if len(enum_values) <= 127: # Enum8('red' = 1, 'green' = 2, 'black' = 3) @@ -428,9 +432,15 @@ def convert_record( if mysql_field_type.startswith('point'): clickhouse_field_value = parse_mysql_point(clickhouse_field_value) - if mysql_field_type.startswith('enum(') and isinstance(clickhouse_field_value, int): + if mysql_field_type.startswith('enum('): enum_values = mysql_structure.fields[idx].additional_data - clickhouse_field_value = enum_values[int(clickhouse_field_value)-1] + field_name = mysql_structure.fields[idx].name if idx < len(mysql_structure.fields) else "unknown" + + clickhouse_field_value = EnumConverter.convert_mysql_to_clickhouse_enum( + clickhouse_field_value, + enum_values, + field_name + ) clickhouse_record.append(clickhouse_field_value) return tuple(clickhouse_record) @@ -834,107 +844,16 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None end_pos = line.find('`', 1) field_name = line[1:end_pos] line = line[end_pos + 1 :].strip() - # Don't split by space for enum and set types that might contain spaces - if line.lower().startswith('enum(') or line.lower().startswith('set('): - # Find the end of the enum/set definition (closing parenthesis) - open_parens = 0 - in_quotes = False - quote_char = None - end_pos = -1 - - for i, char in enumerate(line): - if char in "'\"" and (i == 0 or line[i - 1] != "\\"): - if not in_quotes: - in_quotes = True - quote_char = char - elif char == quote_char: - in_quotes = False - elif char == '(' and not in_quotes: - open_parens += 1 - elif char == ')' and not in_quotes: - open_parens -= 1 - if open_parens == 0: - end_pos = i + 1 - break - - if end_pos > 0: - field_type = line[:end_pos] - field_parameters = line[end_pos:].strip() - else: - # Fallback to original behavior if we can't find the end - definition = line.split(' ') - field_type = definition[0] - field_parameters = ( - ' '.join(definition[1:]) if len(definition) > 1 else '' - ) - else: - definition = line.split(' ') - field_type = definition[0] - field_parameters = ( - ' '.join(definition[1:]) if len(definition) > 1 else '' - ) + # Use our new enum parsing utilities + field_name, field_type, field_parameters = parse_enum_or_set_field(line, field_name, is_backtick_quoted=True) else: definition = line.split(' ') field_name = strip_sql_name(definition[0]) - definition = definition[1:] - if definition and ( - definition[0].lower().startswith('enum(') - or definition[0].lower().startswith('set(') - ): - line = ' '.join(definition) - # Find the end of the enum/set definition (closing parenthesis) - open_parens = 0 - in_quotes = False - quote_char = None - end_pos = -1 - - for i, char in enumerate(line): - if char in "'\"" and (i == 0 or line[i - 1] != "\\"): - if not in_quotes: - in_quotes = True - quote_char = char - elif char == quote_char: - in_quotes = False - elif char == '(' and not in_quotes: - open_parens += 1 - elif char == ')' and not in_quotes: - open_parens -= 1 - if open_parens == 0: - end_pos = i + 1 - break - - if end_pos > 0: - field_type = line[:end_pos] - field_parameters = line[end_pos:].strip() - else: - # Fallback to original behavior - field_type = definition[0] - field_parameters = ( - ' '.join(definition[1:]) if len(definition) > 1 else '' - ) - else: - field_type = definition[0] - field_parameters = ( - ' '.join(definition[1:]) if len(definition) > 1 else '' - ) - - additional_data = None - if 'set(' in field_type.lower(): - vals = field_type[len('set('):] - close_pos = vals.find(')') - vals = vals[:close_pos] - vals = vals.split(',') - def vstrip(e): - if not e: - return e - if e[0] in '"\'': - return e[1:-1] - return e - vals = [vstrip(v) for v in vals] - additional_data = vals - - if field_type.lower().startswith('enum('): - additional_data = parse_mysql_enum(field_type) + # Use our new enum parsing utilities + field_name, field_type, field_parameters = parse_enum_or_set_field(line, field_name, is_backtick_quoted=False) + + # Extract additional data for enum and set types + additional_data = extract_enum_or_set_values(field_type, from_parser_func=parse_mysql_enum) structure.fields.append(TableField( name=field_name, diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 9a1ac92..87fd94d 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -180,12 +180,11 @@ def run(self): if self.state.status != Status.NONE: # ensure target database still exists - if self.target_database not in self.clickhouse_api.get_databases(): + if self.target_database not in self.clickhouse_api.get_databases() and f"{self.target_database}_tmp" not in self.clickhouse_api.get_databases(): logger.warning(f'database {self.target_database} missing in CH') - if self.initial_only: - logger.warning('will run replication from scratch') - self.state.remove() - self.state = self.create_state() + logger.warning('will run replication from scratch') + self.state.remove() + self.state = self.create_state() if self.state.status == Status.RUNNING_REALTIME_REPLICATION: self.run_realtime_replication() @@ -227,6 +226,10 @@ def create_initial_structure_table(self, table_name): ) self.validate_mysql_structure(mysql_structure) clickhouse_structure = self.converter.convert_table_structure(mysql_structure) + + # Always set if_not_exists to True to prevent errors when tables already exist + clickhouse_structure.if_not_exists = True + self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure) indexes = self.config.get_indexes(self.database, table_name) self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes) diff --git a/mysql_ch_replicator/enum/__init__.py b/mysql_ch_replicator/enum/__init__.py new file mode 100644 index 0000000..9c36c98 --- /dev/null +++ b/mysql_ch_replicator/enum/__init__.py @@ -0,0 +1,21 @@ +from .parser import parse_mysql_enum, is_enum_type +from .converter import EnumConverter +from .utils import find_enum_definition_end, extract_field_components +from .ddl_parser import ( + find_enum_or_set_definition_end, + parse_enum_or_set_field, + extract_enum_or_set_values, + strip_value +) + +__all__ = [ + 'parse_mysql_enum', + 'is_enum_type', + 'EnumConverter', + 'find_enum_definition_end', + 'extract_field_components', + 'find_enum_or_set_definition_end', + 'parse_enum_or_set_field', + 'extract_enum_or_set_values', + 'strip_value' +] diff --git a/mysql_ch_replicator/enum/converter.py b/mysql_ch_replicator/enum/converter.py new file mode 100644 index 0000000..51549b7 --- /dev/null +++ b/mysql_ch_replicator/enum/converter.py @@ -0,0 +1,72 @@ +from typing import List, Union, Optional, Any +from logging import getLogger + +# Create a single module-level logger +logger = getLogger(__name__) + +class EnumConverter: + """Class to handle conversion of enum values between MySQL and ClickHouse""" + + @staticmethod + def convert_mysql_to_clickhouse_enum( + value: Any, + enum_values: List[str], + field_name: str = "unknown" + ) -> Optional[Union[str, int]]: + """ + Convert a MySQL enum value to the appropriate ClickHouse representation + + Args: + value: The MySQL enum value (can be int, str, None) + enum_values: List of possible enum string values + field_name: Name of the field (for better error reporting) + + Returns: + The properly converted enum value for ClickHouse + """ + # Handle NULL values + if value is None: + return None + + # Handle integer values (index-based) + if isinstance(value, int): + # Check if the value is 0 + if value == 0: + # Return 0 as-is - let ClickHouse handle it according to the field's nullability + logger.debug(f"ENUM CONVERSION: Found enum index 0 for field '{field_name}'. Keeping as 0.") + return 0 + + # Validate that the enum index is within range + if value < 1 or value > len(enum_values): + # Log the issue + logger.error(f"ENUM CONVERSION: Invalid enum index {value} for field '{field_name}' " + f"with values {enum_values}") + # Return the value unchanged + return value + else: + # Convert to the string representation (lowercase to match our new convention) + return enum_values[int(value)-1].lower() + + # Handle string values + elif isinstance(value, str): + # Validate that the string value exists in enum values + # First check case-sensitive, then case-insensitive + if value in enum_values: + return value.lower() + + # Try case-insensitive match + lowercase_enum_values = [v.lower() for v in enum_values] + if value.lower() in lowercase_enum_values: + return value.lower() + + # Value not found in enum values + logger.error(f"ENUM CONVERSION: Invalid enum value '{value}' not in {enum_values} " + f"for field '{field_name}'") + # Return the value unchanged + return value + + # Handle any other unexpected types + else: + logger.error(f"ENUM CONVERSION: Unexpected type {type(value)} for enum field '{field_name}'") + # Return the value unchanged + return value \ No newline at end of file diff --git a/mysql_ch_replicator/enum/ddl_parser.py b/mysql_ch_replicator/enum/ddl_parser.py new file mode 100644 index 0000000..504efcf --- /dev/null +++ b/mysql_ch_replicator/enum/ddl_parser.py @@ -0,0 +1,134 @@ +from typing import List, Tuple, Optional, Dict, Any + +def find_enum_or_set_definition_end(line: str) -> Tuple[int, str, str]: + """ + Find the end of an enum or set definition in a DDL line + + Args: + line: The DDL line containing an enum or set definition + + Returns: + Tuple containing (end_position, field_type, field_parameters) + """ + open_parens = 0 + in_quotes = False + quote_char = None + end_pos = -1 + + for i, char in enumerate(line): + if char in "'\"" and (i == 0 or line[i - 1] != "\\"): + if not in_quotes: + in_quotes = True + quote_char = char + elif char == quote_char: + in_quotes = False + elif char == '(' and not in_quotes: + open_parens += 1 + elif char == ')' and not in_quotes: + open_parens -= 1 + if open_parens == 0: + end_pos = i + 1 + break + + if end_pos > 0: + field_type = line[:end_pos] + field_parameters = line[end_pos:].strip() + return end_pos, field_type, field_parameters + + # Fallback to splitting by space if we can't find the end + definition = line.split(' ') + field_type = definition[0] + field_parameters = ' '.join(definition[1:]) if len(definition) > 1 else '' + + return -1, field_type, field_parameters + + +def parse_enum_or_set_field(line: str, field_name: str, is_backtick_quoted: bool = False) -> Tuple[str, str, str]: + """ + Parse a field definition line containing an enum or set type + + Args: + line: The line to parse + field_name: The name of the field (already extracted) + is_backtick_quoted: Whether the field name was backtick quoted + + Returns: + Tuple containing (field_name, field_type, field_parameters) + """ + # If the field name was backtick quoted, it's already been extracted + if is_backtick_quoted: + line = line.strip() + # Don't split by space for enum and set types that might contain spaces + if line.lower().startswith('enum(') or line.lower().startswith('set('): + end_pos, field_type, field_parameters = find_enum_or_set_definition_end(line) + else: + definition = line.split(' ') + field_type = definition[0] + field_parameters = ' '.join(definition[1:]) if len(definition) > 1 else '' + else: + # For non-backtick quoted fields + definition = line.split(' ') + definition = definition[1:] # Skip the field name which was already extracted + + if definition and ( + definition[0].lower().startswith('enum(') + or definition[0].lower().startswith('set(') + ): + line = ' '.join(definition) + end_pos, field_type, field_parameters = find_enum_or_set_definition_end(line) + else: + field_type = definition[0] if definition else "" + field_parameters = ' '.join(definition[1:]) if len(definition) > 1 else '' + + return field_name, field_type, field_parameters + + +def extract_enum_or_set_values(field_type: str, from_parser_func=None) -> Optional[List[str]]: + """ + Extract values from an enum or set field type + + Args: + field_type: The field type string (e.g. "enum('a','b','c')") + from_parser_func: Optional function to use for parsing (defaults to simple string parsing) + + Returns: + List of extracted values or None if not an enum/set + """ + if field_type.lower().startswith('enum('): + # Use the provided parser function if available + if from_parser_func: + return from_parser_func(field_type) + + # Simple parsing fallback + vals = field_type[len('enum('):] + close_pos = vals.find(')') + vals = vals[:close_pos] + vals = vals.split(',') + return [strip_value(v) for v in vals] + + elif 'set(' in field_type.lower(): + vals = field_type[field_type.lower().find('set(') + len('set('):] + close_pos = vals.find(')') + vals = vals[:close_pos] + vals = vals.split(',') + return [strip_value(v) for v in vals] + + return None + + +def strip_value(value: str) -> str: + """ + Strip quotes from enum/set values + + Args: + value: The value to strip + + Returns: + Stripped value + """ + value = value.strip() + if not value: + return value + if value[0] in '"\'`': + return value[1:-1] + return value \ No newline at end of file diff --git a/mysql_ch_replicator/converter_enum_parser.py b/mysql_ch_replicator/enum/parser.py similarity index 94% rename from mysql_ch_replicator/converter_enum_parser.py rename to mysql_ch_replicator/enum/parser.py index 92192ea..888f3a9 100644 --- a/mysql_ch_replicator/converter_enum_parser.py +++ b/mysql_ch_replicator/enum/parser.py @@ -1,5 +1,3 @@ - - def parse_mysql_enum(enum_definition): """ Accepts a MySQL ENUM definition string (case–insensitive), @@ -175,7 +173,7 @@ def _parse_enum_values(content): # Skip whitespace after the literal. while i < len(content) and content[i].isspace(): i += 1 - # If there’s a comma, skip it; otherwise, we must be at the end. + # If there's a comma, skip it; otherwise, we must be at the end. if i < len(content): if content[i] == ',': i += 1 @@ -185,7 +183,18 @@ def _parse_enum_values(content): return values -# --- For testing purposes --- +def is_enum_type(field_type): + """ + Check if a field type is an enum type + + Args: + field_type: The MySQL field type string + + Returns: + bool: True if it's an enum type, False otherwise + """ + return field_type.lower().startswith('enum(') + if __name__ == '__main__': tests = [ "enum('point','qwe','def')", @@ -203,4 +212,4 @@ def _parse_enum_values(content): result = parse_mysql_enum(t) print("Input: {}\nParsed: {}\n".format(t, result)) except Exception as e: - print("Error parsing {}: {}\n".format(t, e)) + print("Error parsing {}: {}\n".format(t, e)) \ No newline at end of file diff --git a/mysql_ch_replicator/enum/utils.py b/mysql_ch_replicator/enum/utils.py new file mode 100644 index 0000000..bfed4f1 --- /dev/null +++ b/mysql_ch_replicator/enum/utils.py @@ -0,0 +1,99 @@ +from typing import List, Optional, Tuple + +def find_enum_definition_end(text: str, start_pos: int) -> int: + """ + Find the end position of an enum definition in a string + + Args: + text: The input text containing the enum definition + start_pos: The starting position (after 'enum(') + + Returns: + int: The position of the closing parenthesis + """ + open_parens = 1 + in_quotes = False + quote_char = None + + for i in range(start_pos, len(text)): + char = text[i] + + # Handle quote state + if not in_quotes and char in ("'", '"', '`'): + in_quotes = True + quote_char = char + continue + elif in_quotes and char == quote_char: + # Check for escaped quotes + if i > 0 and text[i-1] == '\\': + # This is an escaped quote, not the end of the quoted string + continue + # End of quoted string + in_quotes = False + quote_char = None + continue + + # Only process parentheses when not in quotes + if not in_quotes: + if char == '(': + open_parens += 1 + elif char == ')': + open_parens -= 1 + if open_parens == 0: + return i + + # If we get here, the definition is malformed + raise ValueError("Unbalanced parentheses in enum definition") + + +def extract_field_components(line: str) -> Tuple[str, str, List[str]]: + """ + Extract field name, type, and parameters from a MySQL field definition line + + Args: + line: A line from a field definition + + Returns: + Tuple containing field_name, field_type, and parameters + """ + components = line.split(' ') + field_name = components[0].strip('`') + + # Handle special case for enum and set types that might contain spaces + if len(components) > 1 and ( + components[1].lower().startswith('enum(') or + components[1].lower().startswith('set(') + ): + field_type_start = components[1] + field_type_components = [field_type_start] + + # If the enum definition is not complete on this component + if not _is_complete_definition(field_type_start): + # Join subsequent components until we find the end of the definition + for component in components[2:]: + field_type_components.append(component) + if ')' in component: + break + + field_type = ' '.join(field_type_components) + parameters = components[len(field_type_components) + 1:] + else: + field_type = components[1] if len(components) > 1 else "" + parameters = components[2:] if len(components) > 2 else [] + + return field_name, field_type, parameters + + +def _is_complete_definition(text: str) -> bool: + """ + Check if a string contains a complete enum definition (balanced parentheses) + + Args: + text: The string to check + + Returns: + bool: True if the definition is complete + """ + open_count = text.count('(') + close_count = text.count(')') + return open_count > 0 and open_count == close_count \ No newline at end of file diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 8380cc6..8c35333 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1292,6 +1292,9 @@ def test_percona_migration(monkeypatch): mysql.execute( f"DROP TABLE IF EXISTS `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`;") + # Wait for table to be recreated in ClickHouse after rename + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + mysql.execute( f"INSERT INTO `{TEST_TABLE_NAME}` (id, c1) VALUES (43, 1)", commit=True, @@ -1554,6 +1557,69 @@ def test_alter_tokens_split(): assert result == expected +def test_enum_conversion(): + """ + Test that enum values are properly converted to lowercase in ClickHouse + and that zero values are preserved rather than converted to first enum value. + """ + config_file = CONFIG_FILE + cfg = config.Settings() + cfg.load(config_file) + mysql_config = cfg.mysql + clickhouse_config = cfg.clickhouse + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=mysql_config + ) + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=clickhouse_config + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id INT NOT NULL AUTO_INCREMENT, + status_mixed_case ENUM('Purchase','Sell','Transfer') NOT NULL, + status_empty ENUM('Yes','No','Maybe'), + PRIMARY KEY (id) + ) + ''') + + # Insert values with mixed case and NULL values + mysql.execute(f''' + INSERT INTO `{TEST_TABLE_NAME}` (status_mixed_case, status_empty) VALUES + ('Purchase', 'Yes'), + ('Sell', NULL), + ('Transfer', NULL); + ''', commit=True) + + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + + # Get the ClickHouse data + results = ch.select(TEST_TABLE_NAME) + + # Verify all values are properly converted + assert results[0]['status_mixed_case'] == 'purchase' + assert results[1]['status_mixed_case'] == 'sell' + assert results[2]['status_mixed_case'] == 'transfer' + + # Status_empty should handle NULL values correctly + assert results[0]['status_empty'] == 'yes' + assert results[1]['status_empty'] is None + assert results[2]['status_empty'] is None + + run_all_runner.stop() + assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) + assert('Traceback' not in read_logs(TEST_DB_NAME)) + @pytest.mark.parametrize("query,expected", [ ("CREATE TABLE `mydb`.`mytable` (id INT)", "mydb"), ("CREATE TABLE mydb.mytable (id INT)", "mydb"),