From 288402b1926a74756dd7eaf987fa0fa5ce1cfab3 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Tue, 4 Mar 2025 03:02:24 +0400 Subject: [PATCH 1/2] Handle numeric data type --- mysql_ch_replicator/converter.py | 99 ++++++++++++++++++++++++++++++++ test_mysql_ch_replicator.py | 41 +++++++++++++ 2 files changed, 140 insertions(+) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index fe244ba..c50d9d6 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -196,6 +196,44 @@ def convert_type(self, mysql_type, parameters): if mysql_type == 'point': return 'Tuple(x Float32, y Float32)' + # Correctly handle numeric types + if mysql_type.startswith('numeric'): + # Determine if parameters are specified via parentheses: + if '(' in mysql_type and ')' in mysql_type: + # Expecting a type definition like "numeric(precision, scale)" + pattern = r"numeric\((\d+)\s*,\s*(\d+)\)" + match = re.search(pattern, mysql_type) + if not match: + raise ValueError(f"Invalid numeric type definition: {mysql_type}") + + precision = int(match.group(1)) + scale = int(match.group(2)) + else: + # If no parentheses are provided, assume defaults. + precision = 10 # or other default as defined by your standards + scale = 0 + + # If no fractional part, consider mapping to integer type (if desired) + if scale == 0: + if is_unsigned: + if precision <= 9: + return "UInt32" + elif precision <= 18: + return "UInt64" + else: + # For very large precisions, fallback to Decimal + return f"Decimal({precision}, {scale})" + else: + if precision <= 9: + return "Int32" + elif precision <= 18: + return "Int64" + else: + return f"Decimal({precision}, {scale})" + else: + # For types with a defined fractional part, use a Decimal mapping. + return f"Decimal({precision}, {scale})" + if mysql_type == 'int': if is_unsigned: return 'UInt32' @@ -472,7 +510,68 @@ def convert_alter_query(self, mysql_query, db_name): raise Exception(f'operation {op_name} not implement, query: {subquery}') + @classmethod + def _tokenize_alter_query(cls, sql_line): + # We want to recognize tokens that may be: + # 1. A backquoted identifier that can optionally be immediately followed by parentheses. + # 2. A plain word (letters/digits/underscore) that may immediately be followed by a parenthesized argument list. + # 3. A single-quoted or double-quoted string. + # 4. Or, if nothing else, any non‐whitespace sequence. + # + # The order is important: for example, if a word is immediately followed by parentheses, + # we want to grab it as a single token. + token_pattern = re.compile(r''' + ( # start capture group for a token + `[^`]+`(?:\([^)]*\))? | # backquoted identifier w/ optional parentheses + \w+(?:\([^)]*\))? | # a word with optional parentheses + '(?:\\'|[^'])*' | # a single-quoted string + "(?:\\"|[^"])*" | # a double-quoted string + [^\s]+ # fallback: any sequence of non-whitespace characters + ) + ''', re.VERBOSE) + tokens = token_pattern.findall(sql_line) + + # Now, split the column definition into: + # token0 = column name, + # token1 = data type (which might be multiple tokens, e.g. DOUBLE PRECISION, INT UNSIGNED, + # or a word+parentheses like VARCHAR(254) or NUMERIC(5, 2)), + # remaining tokens: the parameters such as DEFAULT, NOT, etc. + # + # We define a set of keywords that indicate the start of column options. + constraint_keywords = { + "DEFAULT", "NOT", "NULL", "AUTO_INCREMENT", "PRIMARY", "UNIQUE", + "COMMENT", "COLLATE", "REFERENCES", "ON", "CHECK", "CONSTRAINT", + "AFTER", "BEFORE", "GENERATED", "VIRTUAL", "STORED" + } + + if not tokens: + return tokens + # The first token is always the column name. + column_name = tokens[0] + + # Now “merge” tokens after the column name that belong to the type. + # (For many types the type is written as a single token already – + # e.g. "VARCHAR(254)" or "NUMERIC(5, 2)", but for types like + # "DOUBLE PRECISION" or "INT UNSIGNED" the .split() would produce two tokens.) + type_tokens = [] + i = 1 + while i < len(tokens) and tokens[i].upper() not in constraint_keywords: + type_tokens.append(tokens[i]) + i += 1 + merged_type = " ".join(type_tokens) if type_tokens else "" + + # The remaining tokens are passed through unchanged. + param_tokens = tokens[i:] + + # Result: [column name, merged type, all the rest] + if merged_type: + return [column_name, merged_type] + param_tokens + else: + return [column_name] + param_tokens + def __convert_alter_table_add_column(self, db_name, table_name, tokens): + tokens = self._tokenize_alter_query(' '.join(tokens)) + if len(tokens) < 2: raise Exception('wrong tokens count', tokens) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index f04dfc0..67f2070 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -5,6 +5,7 @@ import subprocess import json import uuid +import decimal import pytest import requests @@ -276,6 +277,12 @@ def test_e2e_multistatement(): mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE name='Ivan';", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD factor NUMERIC(5, 2) DEFAULT NULL;") + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, factor) VALUES ('Snow', 31, 13.29);", commit=True) + + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Snow'")[0].get('factor') == decimal.Decimal('13.29')) + mysql.execute( f"CREATE TABLE {TEST_TABLE_NAME_2} " f"(id int NOT NULL AUTO_INCREMENT, name varchar(255), age int, " @@ -1493,3 +1500,37 @@ def _get_last_insert_name(): print("*****************************") print('\n\n') + +def test_alter_tokens_split(): + examples = [ + # basic examples from the prompt: + ("test_name VARCHAR(254) NULL", ["test_name", "VARCHAR(254)", "NULL"]), + ("factor NUMERIC(5, 2) DEFAULT NULL", ["factor", "NUMERIC(5, 2)", "DEFAULT", "NULL"]), + # backquoted column name: + ("`test_name` VARCHAR(254) NULL", ["`test_name`", "VARCHAR(254)", "NULL"]), + ("`order` INT NOT NULL", ["`order`", "INT", "NOT", "NULL"]), + # type that contains a parenthesized list with quoted values: + ("status ENUM('active','inactive') DEFAULT 'active'", + ["status", "ENUM('active','inactive')", "DEFAULT", "'active'"]), + # multi‐word type definitions: + ("col DOUBLE PRECISION DEFAULT 0", ["col", "DOUBLE PRECISION", "DEFAULT", "0"]), + ("col INT UNSIGNED DEFAULT 0", ["col", "INT UNSIGNED", "DEFAULT", "0"]), + # a case with a quoted string containing spaces and punctuation: + ("message VARCHAR(100) DEFAULT 'Hello, world!'", + ["message", "VARCHAR(100)", "DEFAULT", "'Hello, world!'"]), + # longer definition with more options: + ("col DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP", + ["col", "DATETIME", "DEFAULT", "CURRENT_TIMESTAMP", "ON", "UPDATE", "CURRENT_TIMESTAMP"]), + # type with a COMMENT clause (here the type is given, then a parameter keyword) + ("col VARCHAR(100) COMMENT 'This is a test comment'", + ["col", "VARCHAR(100)", "COMMENT", "'This is a test comment'"]) + ] + + for sql, expected in examples: + result = MysqlToClickhouseConverter._tokenize_alter_query(sql) + print("SQL Input: ", sql) + print("Expected: ", expected) + print("Tokenized: ", result) + print("Match? ", result == expected) + print("-" * 60) + assert result == expected From 0b02b4c9e9349e4a4d769b6c41ec55e0f15c735c Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Tue, 4 Mar 2025 03:17:40 +0400 Subject: [PATCH 2/2] Fixed tests --- mysql_ch_replicator/converter.py | 3 ++- test_mysql_ch_replicator.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index c50d9d6..9aecffe 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -541,7 +541,8 @@ def _tokenize_alter_query(cls, sql_line): constraint_keywords = { "DEFAULT", "NOT", "NULL", "AUTO_INCREMENT", "PRIMARY", "UNIQUE", "COMMENT", "COLLATE", "REFERENCES", "ON", "CHECK", "CONSTRAINT", - "AFTER", "BEFORE", "GENERATED", "VIRTUAL", "STORED" + "AFTER", "BEFORE", "GENERATED", "VIRTUAL", "STORED", "FIRST", + "ALWAYS", "AS", "IDENTITY", "INVISIBLE", "PERSISTED", } if not tokens: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 67f2070..9b5d30e 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1523,7 +1523,8 @@ def test_alter_tokens_split(): ["col", "DATETIME", "DEFAULT", "CURRENT_TIMESTAMP", "ON", "UPDATE", "CURRENT_TIMESTAMP"]), # type with a COMMENT clause (here the type is given, then a parameter keyword) ("col VARCHAR(100) COMMENT 'This is a test comment'", - ["col", "VARCHAR(100)", "COMMENT", "'This is a test comment'"]) + ["col", "VARCHAR(100)", "COMMENT", "'This is a test comment'"]), + ("c1 INT FIRST", ["c1", "INT", "FIRST"]), ] for sql, expected in examples: