Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ indexes: # optional
http_host: '0.0.0.0' # optional
http_port: 9128 # optional

types_mapping: # optional
'char(36)': 'UUID'


```

#### Required settings
Expand All @@ -194,6 +198,7 @@ http_port: 9128 # optional
- `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage.
- `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch.
- `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands
- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc.

Few more tables / dbs examples:

Expand Down
2 changes: 2 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self):
self.auto_restart_interval = 0
self.http_host = ''
self.http_port = 0
self.types_mapping = {}
self.target_databases = {}

def load(self, settings_file):
Expand All @@ -131,6 +132,7 @@ def load(self, settings_file):
self.auto_restart_interval = data.pop(
'auto_restart_interval', Settings.DEFAULT_AUTO_RESTART_INTERVAL,
)
self.types_mapping = data.pop('types_mapping', {})
self.http_host = data.pop('http_host', '')
self.http_port = data.pop('http_port', 0)
self.target_databases = data.pop('target_databases', {})
Expand Down
14 changes: 14 additions & 0 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import struct
import json
import uuid
import sqlparse
import re
from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList
Expand Down Expand Up @@ -180,10 +181,17 @@ def convert_timestamp_to_datetime64(input_str):
class MysqlToClickhouseConverter:
def __init__(self, db_replicator: 'DbReplicator' = None):
self.db_replicator = db_replicator
self.types_mapping = {}
if self.db_replicator is not None:
self.types_mapping = db_replicator.config.types_mapping

def convert_type(self, mysql_type, parameters):
is_unsigned = 'unsigned' in parameters.lower()

result_type = self.types_mapping.get(mysql_type)
if result_type is not None:
return result_type

if mysql_type == 'point':
return 'Tuple(x Float32, y Float32)'

Expand Down Expand Up @@ -329,6 +337,12 @@ def convert_record(
clickhouse_field_value = json.dumps(convert_bytes(clickhouse_field_value))

if clickhouse_field_value is not None:
if 'UUID' in clickhouse_field_type:
if len(clickhouse_field_value) == 36:
if isinstance(clickhouse_field_value, bytes):
clickhouse_field_value = clickhouse_field_value.decode('utf-8')
clickhouse_field_value = uuid.UUID(clickhouse_field_value).bytes

if 'UInt16' in clickhouse_field_type and clickhouse_field_value < 0:
clickhouse_field_value = 65536 + clickhouse_field_value
if 'UInt8' in clickhouse_field_type and clickhouse_field_value < 0:
Expand Down
13 changes: 9 additions & 4 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import time
import subprocess
import json
import uuid

import pytest
import requests

Expand Down Expand Up @@ -976,13 +978,14 @@ def test_different_types_2():
test3 binary(16),
test4 set('1','2','3','4','5','6','7'),
test5 timestamp(0),
test6 char(36),
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5) VALUES "
f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00');",
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5, test6) VALUES "
f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00', '550e8400-e29b-41d4-a716-446655440000');",
commit=True,
)

Expand All @@ -999,8 +1002,8 @@ def test_different_types_2():
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5) VALUES "
f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00');",
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5, test6) VALUES "
f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00', '110e6103-e39b-51d4-a716-826755413099');",
commit=True,
)

Expand All @@ -1018,6 +1021,8 @@ def test_different_types_2():
assert isinstance(value, datetime.datetime)
assert str(value) == '2023-08-15 14:40:00+00:00'

assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test6'] == uuid.UUID('110e6103-e39b-51d4-a716-826755413099')

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2) VALUES "
f"(0, NULL);",
Expand Down
3 changes: 3 additions & 0 deletions tests_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ indexes:

http_host: 'localhost'
http_port: 9128

types_mapping:
'char(36)': 'UUID'
Loading