diff --git a/README.md b/README.md index 967ab31..9d919a9 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,10 @@ indexes: # optional http_host: '0.0.0.0' # optional http_port: 9128 # optional +types_mapping: # optional + 'char(36)': 'UUID' + + ``` #### Required settings @@ -194,6 +198,7 @@ http_port: 9128 # optional - `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage. - `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch. - `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands +- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc. Few more tables / dbs examples: diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index b6d7602..72b23a0 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -110,6 +110,7 @@ def __init__(self): self.auto_restart_interval = 0 self.http_host = '' self.http_port = 0 + self.types_mapping = {} self.target_databases = {} def load(self, settings_file): @@ -131,6 +132,7 @@ def load(self, settings_file): self.auto_restart_interval = data.pop( 'auto_restart_interval', Settings.DEFAULT_AUTO_RESTART_INTERVAL, ) + self.types_mapping = data.pop('types_mapping', {}) self.http_host = data.pop('http_host', '') self.http_port = data.pop('http_port', 0) self.target_databases = data.pop('target_databases', {}) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index f659f18..a6c6b57 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -1,5 +1,6 @@ import struct import json +import uuid import sqlparse import re from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList @@ -180,10 +181,17 @@ def convert_timestamp_to_datetime64(input_str): class MysqlToClickhouseConverter: def __init__(self, db_replicator: 'DbReplicator' = None): self.db_replicator = db_replicator + self.types_mapping = {} + if self.db_replicator is not None: + self.types_mapping = db_replicator.config.types_mapping def convert_type(self, mysql_type, parameters): is_unsigned = 'unsigned' in parameters.lower() + result_type = self.types_mapping.get(mysql_type) + if result_type is not None: + return result_type + if mysql_type == 'point': return 'Tuple(x Float32, y Float32)' @@ -329,6 +337,12 @@ def convert_record( clickhouse_field_value = json.dumps(convert_bytes(clickhouse_field_value)) if clickhouse_field_value is not None: + if 'UUID' in clickhouse_field_type: + if len(clickhouse_field_value) == 36: + if isinstance(clickhouse_field_value, bytes): + clickhouse_field_value = clickhouse_field_value.decode('utf-8') + clickhouse_field_value = uuid.UUID(clickhouse_field_value).bytes + if 'UInt16' in clickhouse_field_type and clickhouse_field_value < 0: clickhouse_field_value = 65536 + clickhouse_field_value if 'UInt8' in clickhouse_field_type and clickhouse_field_value < 0: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 32d6097..231005a 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -4,6 +4,8 @@ import time import subprocess import json +import uuid + import pytest import requests @@ -976,13 +978,14 @@ def test_different_types_2(): test3 binary(16), test4 set('1','2','3','4','5','6','7'), test5 timestamp(0), + test6 char(36), PRIMARY KEY (id) ); ''') mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5) VALUES " - f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00');", + f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5, test6) VALUES " + f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00', '550e8400-e29b-41d4-a716-446655440000');", commit=True, ) @@ -999,8 +1002,8 @@ def test_different_types_2(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5) VALUES " - f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00');", + f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5, test6) VALUES " + f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00', '110e6103-e39b-51d4-a716-826755413099');", commit=True, ) @@ -1018,6 +1021,8 @@ def test_different_types_2(): assert isinstance(value, datetime.datetime) assert str(value) == '2023-08-15 14:40:00+00:00' + assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test6'] == uuid.UUID('110e6103-e39b-51d4-a716-826755413099') + mysql.execute( f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2) VALUES " f"(0, NULL);", diff --git a/tests_config.yaml b/tests_config.yaml index 45bac0c..7ec8439 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -30,3 +30,6 @@ indexes: http_host: 'localhost' http_port: 9128 + +types_mapping: + 'char(36)': 'UUID'