From e77d24da6ca2472e23193add0f315f80babf1de3 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Thu, 31 Oct 2024 13:50:04 +0400 Subject: [PATCH] Fixed json serialization --- .../pymysqlreplication/packet.py | 2 +- test_mysql_ch_replicator.py | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/mysql_ch_replicator/pymysqlreplication/packet.py b/mysql_ch_replicator/pymysqlreplication/packet.py index e32c09f..7164d2e 100644 --- a/mysql_ch_replicator/pymysqlreplication/packet.py +++ b/mysql_ch_replicator/pymysqlreplication/packet.py @@ -347,7 +347,7 @@ def read_binary_json(self, size, is_partial): # handle NULL value return None data = self.read(length) - return cpp_mysql_to_json(data) + return cpp_mysql_to_json(data).decode('utf-8') # # if is_partial: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 4758464..63843df 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -2,6 +2,7 @@ import shutil import time import subprocess +import json from mysql_ch_replicator import config from mysql_ch_replicator import mysql_api @@ -646,3 +647,59 @@ def test_numeric_types_and_limits(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test6=4294967290')) == 1) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test6=4294967280')) == 1) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test7=18446744073709551586')) == 2) + + +def test_json(): + cfg = config.Settings() + cfg.load(CONFIG_FILE) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") + + mysql.execute(f''' +CREATE TABLE {TEST_TABLE_NAME} ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + name varchar(255), + data json, + PRIMARY KEY (id) +); + ''') + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (name, data) VALUES " + + """('Ivan', '{"a": "b", "c": [1,2,3]}');""", + commit=True, + ) + + binlog_replicator_runner = BinlogReplicatorRunner() + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) + db_replicator_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + + ch.execute_command(f'USE {TEST_DB_NAME}') + + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (name, data) VALUES " + + """('Peter', '{"b": "b", "c": [3,2,1]}');""", + commit=True, + ) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) + + assert json.loads(ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['data'])['c'] == [1, 2, 3] + assert json.loads(ch.select(TEST_TABLE_NAME, "name='Peter'")[0]['data'])['c'] == [3, 2, 1]