Skip to content
123 changes: 21 additions & 102 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList

from .table_structure import TableStructure, TableField
from .converter_enum_parser import parse_mysql_enum
from .enum import (
parse_mysql_enum, EnumConverter,
parse_enum_or_set_field,
extract_enum_or_set_values
)


CHARSET_MYSQL_TO_PYTHON = {
Expand Down Expand Up @@ -282,7 +286,7 @@ def convert_type(self, mysql_type, parameters):
enum_values = parse_mysql_enum(mysql_type)
ch_enum_values = []
for idx, value_name in enumerate(enum_values):
ch_enum_values.append(f"'{value_name}' = {idx+1}")
ch_enum_values.append(f"'{value_name.lower()}' = {idx+1}")
ch_enum_values = ', '.join(ch_enum_values)
if len(enum_values) <= 127:
# Enum8('red' = 1, 'green' = 2, 'black' = 3)
Expand Down Expand Up @@ -428,9 +432,15 @@ def convert_record(
if mysql_field_type.startswith('point'):
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)

if mysql_field_type.startswith('enum(') and isinstance(clickhouse_field_value, int):
if mysql_field_type.startswith('enum('):
enum_values = mysql_structure.fields[idx].additional_data
clickhouse_field_value = enum_values[int(clickhouse_field_value)-1]
field_name = mysql_structure.fields[idx].name if idx < len(mysql_structure.fields) else "unknown"

clickhouse_field_value = EnumConverter.convert_mysql_to_clickhouse_enum(
clickhouse_field_value,
enum_values,
field_name
)

clickhouse_record.append(clickhouse_field_value)
return tuple(clickhouse_record)
Expand Down Expand Up @@ -834,107 +844,16 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
end_pos = line.find('`', 1)
field_name = line[1:end_pos]
line = line[end_pos + 1 :].strip()
# Don't split by space for enum and set types that might contain spaces
if line.lower().startswith('enum(') or line.lower().startswith('set('):
# Find the end of the enum/set definition (closing parenthesis)
open_parens = 0
in_quotes = False
quote_char = None
end_pos = -1

for i, char in enumerate(line):
if char in "'\"" and (i == 0 or line[i - 1] != "\\"):
if not in_quotes:
in_quotes = True
quote_char = char
elif char == quote_char:
in_quotes = False
elif char == '(' and not in_quotes:
open_parens += 1
elif char == ')' and not in_quotes:
open_parens -= 1
if open_parens == 0:
end_pos = i + 1
break

if end_pos > 0:
field_type = line[:end_pos]
field_parameters = line[end_pos:].strip()
else:
# Fallback to original behavior if we can't find the end
definition = line.split(' ')
field_type = definition[0]
field_parameters = (
' '.join(definition[1:]) if len(definition) > 1 else ''
)
else:
definition = line.split(' ')
field_type = definition[0]
field_parameters = (
' '.join(definition[1:]) if len(definition) > 1 else ''
)
# Use our new enum parsing utilities
field_name, field_type, field_parameters = parse_enum_or_set_field(line, field_name, is_backtick_quoted=True)
else:
definition = line.split(' ')
field_name = strip_sql_name(definition[0])
definition = definition[1:]
if definition and (
definition[0].lower().startswith('enum(')
or definition[0].lower().startswith('set(')
):
line = ' '.join(definition)
# Find the end of the enum/set definition (closing parenthesis)
open_parens = 0
in_quotes = False
quote_char = None
end_pos = -1

for i, char in enumerate(line):
if char in "'\"" and (i == 0 or line[i - 1] != "\\"):
if not in_quotes:
in_quotes = True
quote_char = char
elif char == quote_char:
in_quotes = False
elif char == '(' and not in_quotes:
open_parens += 1
elif char == ')' and not in_quotes:
open_parens -= 1
if open_parens == 0:
end_pos = i + 1
break

if end_pos > 0:
field_type = line[:end_pos]
field_parameters = line[end_pos:].strip()
else:
# Fallback to original behavior
field_type = definition[0]
field_parameters = (
' '.join(definition[1:]) if len(definition) > 1 else ''
)
else:
field_type = definition[0]
field_parameters = (
' '.join(definition[1:]) if len(definition) > 1 else ''
)

additional_data = None
if 'set(' in field_type.lower():
vals = field_type[len('set('):]
close_pos = vals.find(')')
vals = vals[:close_pos]
vals = vals.split(',')
def vstrip(e):
if not e:
return e
if e[0] in '"\'':
return e[1:-1]
return e
vals = [vstrip(v) for v in vals]
additional_data = vals

if field_type.lower().startswith('enum('):
additional_data = parse_mysql_enum(field_type)
# Use our new enum parsing utilities
field_name, field_type, field_parameters = parse_enum_or_set_field(line, field_name, is_backtick_quoted=False)

# Extract additional data for enum and set types
additional_data = extract_enum_or_set_values(field_type, from_parser_func=parse_mysql_enum)

structure.fields.append(TableField(
name=field_name,
Expand Down
13 changes: 8 additions & 5 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,11 @@ def run(self):

if self.state.status != Status.NONE:
# ensure target database still exists
if self.target_database not in self.clickhouse_api.get_databases():
if self.target_database not in self.clickhouse_api.get_databases() and f"{self.target_database}_tmp" not in self.clickhouse_api.get_databases():
logger.warning(f'database {self.target_database} missing in CH')
if self.initial_only:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could not remove this check.
We could start replication, and then interrupt until it finished and then continue. It will start from zero instead of trying to continue.

We should probably check for both target_database and target_database_tmp missing, and only in this case we are good to go with restarting.

Copy link
Contributor Author

@jaredmdobson jaredmdobson Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay i modified it, what do you think now?

My only problem with it is that on boot if the database was not there, it would just continue as if it was 🤷🏿

I'm also available to do a call as well if needed. I'm on discord at: @jaredmdobson

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thank you!

logger.warning('will run replication from scratch')
self.state.remove()
self.state = self.create_state()
logger.warning('will run replication from scratch')
self.state.remove()
self.state = self.create_state()

if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
self.run_realtime_replication()
Expand Down Expand Up @@ -227,6 +226,10 @@ def create_initial_structure_table(self, table_name):
)
self.validate_mysql_structure(mysql_structure)
clickhouse_structure = self.converter.convert_table_structure(mysql_structure)

# Always set if_not_exists to True to prevent errors when tables already exist
clickhouse_structure.if_not_exists = True

self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
indexes = self.config.get_indexes(self.database, table_name)
self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)
Expand Down
21 changes: 21 additions & 0 deletions mysql_ch_replicator/enum/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from .parser import parse_mysql_enum, is_enum_type
from .converter import EnumConverter
from .utils import find_enum_definition_end, extract_field_components
from .ddl_parser import (
find_enum_or_set_definition_end,
parse_enum_or_set_field,
extract_enum_or_set_values,
strip_value
)

__all__ = [
'parse_mysql_enum',
'is_enum_type',
'EnumConverter',
'find_enum_definition_end',
'extract_field_components',
'find_enum_or_set_definition_end',
'parse_enum_or_set_field',
'extract_enum_or_set_values',
'strip_value'
]
72 changes: 72 additions & 0 deletions mysql_ch_replicator/enum/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import List, Union, Optional, Any
from logging import getLogger

# Create a single module-level logger
logger = getLogger(__name__)

class EnumConverter:
"""Class to handle conversion of enum values between MySQL and ClickHouse"""

@staticmethod
def convert_mysql_to_clickhouse_enum(
value: Any,
enum_values: List[str],
field_name: str = "unknown"
) -> Optional[Union[str, int]]:
"""
Convert a MySQL enum value to the appropriate ClickHouse representation

Args:
value: The MySQL enum value (can be int, str, None)
enum_values: List of possible enum string values
field_name: Name of the field (for better error reporting)

Returns:
The properly converted enum value for ClickHouse
"""
# Handle NULL values
if value is None:
return None

# Handle integer values (index-based)
if isinstance(value, int):
# Check if the value is 0
if value == 0:
# Return 0 as-is - let ClickHouse handle it according to the field's nullability
logger.debug(f"ENUM CONVERSION: Found enum index 0 for field '{field_name}'. Keeping as 0.")
return 0

# Validate that the enum index is within range
if value < 1 or value > len(enum_values):
# Log the issue
logger.error(f"ENUM CONVERSION: Invalid enum index {value} for field '{field_name}' "
f"with values {enum_values}")
# Return the value unchanged
return value
else:
# Convert to the string representation (lowercase to match our new convention)
return enum_values[int(value)-1].lower()

# Handle string values
elif isinstance(value, str):
# Validate that the string value exists in enum values
# First check case-sensitive, then case-insensitive
if value in enum_values:
return value.lower()

# Try case-insensitive match
lowercase_enum_values = [v.lower() for v in enum_values]
if value.lower() in lowercase_enum_values:
return value.lower()

# Value not found in enum values
logger.error(f"ENUM CONVERSION: Invalid enum value '{value}' not in {enum_values} "
f"for field '{field_name}'")
# Return the value unchanged
return value

# Handle any other unexpected types
else:
logger.error(f"ENUM CONVERSION: Unexpected type {type(value)} for enum field '{field_name}'")
# Return the value unchanged
return value
Loading