From e991f4eccab588631a560cb7dbc43343d6c8dfc5 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 25 Jan 2025 22:38:51 +0400 Subject: [PATCH 1/2] Support for custom types mapping, support for UUID --- README.md | 5 +++++ mysql_ch_replicator/config.py | 2 ++ mysql_ch_replicator/converter.py | 12 ++++++++++++ test_mysql_ch_replicator.py | 13 +++++++++---- tests_config.yaml | 3 +++ 5 files changed, 31 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 9360f7a..f2d028c 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,10 @@ indexes: # optional http_host: '0.0.0.0' # optional http_port: 9128 # optional +types_mapping: # optional + 'char(36)': 'UUID' + + ``` #### Required settings @@ -188,6 +192,7 @@ http_port: 9128 # optional - `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage. - `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch. - `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands +- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc. Few more tables / dbs examples: diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index 1a6dfb1..1fea155 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -110,6 +110,7 @@ def __init__(self): self.auto_restart_interval = 0 self.http_host = '' self.http_port = 0 + self.types_mapping = {} def load(self, settings_file): data = open(settings_file, 'r').read() @@ -130,6 +131,7 @@ def load(self, settings_file): self.auto_restart_interval = data.pop( 'auto_restart_interval', Settings.DEFAULT_AUTO_RESTART_INTERVAL, ) + self.types_mapping = data.pop('types_mapping', {}) self.http_host = data.pop('http_host', '') self.http_port = data.pop('http_port', 0) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index f659f18..cf7d1d2 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -1,5 +1,6 @@ import struct import json +import uuid import sqlparse import re from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList @@ -180,10 +181,15 @@ def convert_timestamp_to_datetime64(input_str): class MysqlToClickhouseConverter: def __init__(self, db_replicator: 'DbReplicator' = None): self.db_replicator = db_replicator + self.types_mapping = db_replicator.config.types_mapping def convert_type(self, mysql_type, parameters): is_unsigned = 'unsigned' in parameters.lower() + result_type = self.types_mapping.get(mysql_type) + if result_type is not None: + return result_type + if mysql_type == 'point': return 'Tuple(x Float32, y Float32)' @@ -329,6 +335,12 @@ def convert_record( clickhouse_field_value = json.dumps(convert_bytes(clickhouse_field_value)) if clickhouse_field_value is not None: + if 'UUID' in clickhouse_field_type: + if len(clickhouse_field_value) == 36: + if isinstance(clickhouse_field_value, bytes): + clickhouse_field_value = clickhouse_field_value.decode('utf-8') + clickhouse_field_value = uuid.UUID(clickhouse_field_value).bytes + if 'UInt16' in clickhouse_field_type and clickhouse_field_value < 0: clickhouse_field_value = 65536 + clickhouse_field_value if 'UInt8' in clickhouse_field_type and clickhouse_field_value < 0: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 307fd2a..11b89f5 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -4,6 +4,8 @@ import time import subprocess import json +import uuid + import pytest import requests @@ -890,13 +892,14 @@ def test_different_types_2(): test3 binary(16), test4 set('1','2','3','4','5','6','7'), test5 timestamp(0), + test6 char(36), PRIMARY KEY (id) ); ''') mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5) VALUES " - f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00');", + f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5, test6) VALUES " + f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00', '550e8400-e29b-41d4-a716-446655440000');", commit=True, ) @@ -913,8 +916,8 @@ def test_different_types_2(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5) VALUES " - f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00');", + f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5, test6) VALUES " + f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00', '110e6103-e39b-51d4-a716-826755413099');", commit=True, ) @@ -932,6 +935,8 @@ def test_different_types_2(): assert isinstance(value, datetime.datetime) assert str(value) == '2023-08-15 14:40:00+00:00' + assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test6'] == uuid.UUID('110e6103-e39b-51d4-a716-826755413099') + mysql.execute( f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2) VALUES " f"(0, NULL);", diff --git a/tests_config.yaml b/tests_config.yaml index fd28eff..d5f0f15 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -27,3 +27,6 @@ indexes: http_host: 'localhost' http_port: 9128 + +types_mapping: + 'char(36)': 'UUID' From 2f808af7acc3168a8f9c7a0c4e20ed92468f589d Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sat, 25 Jan 2025 22:50:14 +0400 Subject: [PATCH 2/2] Fixed tests --- mysql_ch_replicator/converter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index cf7d1d2..a6c6b57 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -181,7 +181,9 @@ def convert_timestamp_to_datetime64(input_str): class MysqlToClickhouseConverter: def __init__(self, db_replicator: 'DbReplicator' = None): self.db_replicator = db_replicator - self.types_mapping = db_replicator.config.types_mapping + self.types_mapping = {} + if self.db_replicator is not None: + self.types_mapping = db_replicator.config.types_mapping def convert_type(self, mysql_type, parameters): is_unsigned = 'unsigned' in parameters.lower()