From 144cbb127712f7f9341108b7954c15020717322a Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 18 Jan 2025 23:37:06 +0400 Subject: [PATCH] Use DateTime64 for timestamp mysql type (instead of string) --- mysql_ch_replicator/converter.py | 21 +++++++++++++++++++++ test_mysql_ch_replicator.py | 14 ++++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 126d686..8e8a841 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -158,6 +158,25 @@ def strip_sql_comments(sql_statement): return sqlparse.format(sql_statement, strip_comments=True).strip() +def convert_timestamp_to_datetime64(input_str): + + # Define the regex pattern + pattern = r'^timestamp(?:\((\d+)\))?$' + + # Attempt to match the pattern + match = re.match(pattern, input_str.strip(), re.IGNORECASE) + + if match: + # If a precision is provided, include it in the replacement + precision = match.group(1) + if precision is not None: + return f'DateTime64({precision})' + else: + return 'DateTime64' + else: + raise ValueError(f"Invalid input string format: '{input_str}'") + + class MysqlToClickhouseConverter: def __init__(self, db_replicator: 'DbReplicator' = None): self.db_replicator = db_replicator @@ -238,6 +257,8 @@ def convert_type(self, mysql_type, parameters): return 'Int32' if 'real' in mysql_type: return 'Float64' + if mysql_type.startswith('timestamp'): + return convert_timestamp_to_datetime64(mysql_type) if mysql_type.startswith('time'): return 'String' if 'varbinary' in mysql_type: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index b712e17..8b9504d 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1,3 +1,4 @@ +import datetime import os import shutil import time @@ -887,13 +888,14 @@ def test_different_types_2(): test2 point, test3 binary(16), test4 set('1','2','3','4','5','6','7'), + test5 timestamp(0), PRIMARY KEY (id) ); ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3, test4) VALUES " - f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5');", + f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3, test4, test5) VALUES " + f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00');", commit=True, ) @@ -910,8 +912,8 @@ def test_different_types_2(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test4) VALUES " - f"(1, POINT(15.0, 14.0), '2,4,5');", + f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test4, test5) VALUES " + f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00');", commit=True, ) @@ -925,6 +927,10 @@ def test_different_types_2(): assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test4'] == '2,4,5' assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test4'] == '1,3,5' + value = ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test5'] + assert isinstance(value, datetime.datetime) + assert str(value) == '2023-08-15 14:40:00+00:00' + mysql.execute( f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES " f"(0, NULL);",