diff --git a/.cursor/rules/rules.mdc b/.cursor/rules/rules.mdc index c4bb604..cf5de6a 100644 --- a/.cursor/rules/rules.mdc +++ b/.cursor/rules/rules.mdc @@ -3,6 +3,10 @@ description: globs: alwaysApply: true --- -Use following command to run tests: +1. Use following command to run tests: docker exec -w /app/ -it tests-replicator-1 python3 -m pytest -v -s tests/ -k test_your_test_name + +2. Never create a config files in tests code in runtime. Always create a real config files. Use log level info for newly added config files. + +3. Dont create analyzes md files. diff --git a/mysql_ch_replicator/db_optimizer.py b/mysql_ch_replicator/db_optimizer.py index 72433d7..cfdc00b 100644 --- a/mysql_ch_replicator/db_optimizer.py +++ b/mysql_ch_replicator/db_optimizer.py @@ -47,6 +47,7 @@ def __init__(self, config: Settings): self.mysql_api = MySQLApi( database=None, mysql_settings=config.mysql, + mysql_timezone=config.mysql_timezone, ) self.clickhouse_api = ClickhouseApi( database=None, diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index da906f2..20705b2 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -163,6 +163,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None, self.mysql_api = MySQLApi( database=self.database, mysql_settings=config.mysql, + mysql_timezone=config.mysql_timezone, ) self.clickhouse_api = ClickhouseApi( database=self.target_database, diff --git a/mysql_ch_replicator/monitoring.py b/mysql_ch_replicator/monitoring.py index 6e1f3a9..4d41fa1 100644 --- a/mysql_ch_replicator/monitoring.py +++ b/mysql_ch_replicator/monitoring.py @@ -18,7 +18,11 @@ class Monitoring: def __init__(self, databases: str, config: Settings): self.config = config self.databases = [db.strip() for db in databases.split(',') if db.strip()] - self.mysql_api = MySQLApi(database=None, mysql_settings=config.mysql) + self.mysql_api = MySQLApi( + database=None, + mysql_settings=config.mysql, + mysql_timezone=config.mysql_timezone, + ) def run(self): stats = [] diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index c89bcb9..c51d693 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -1,4 +1,7 @@ +import datetime import time +import zoneinfo + import mysql.connector from .config import MysqlSettings @@ -8,9 +11,10 @@ class MySQLApi: RECONNECT_INTERVAL = 3 * 60 - def __init__(self, database: str, mysql_settings: MysqlSettings): + def __init__(self, database: str, mysql_settings: MysqlSettings, mysql_timezone: str = 'UTC'): self.database = database self.mysql_settings = mysql_settings + self.mysql_timezone = mysql_timezone self.last_connect_time = 0 self.reconnect_if_required() @@ -44,6 +48,10 @@ def reconnect_if_required(self, force=False): else: raise self.cursor = self.db.cursor() + + if self.mysql_timezone and self.mysql_timezone != 'UTC': + self.cursor.execute(f"SET time_zone = '{self.mysql_timezone}'") + if self.database is not None: self.cursor.execute(f'USE `{self.database}`') self.last_connect_time = curr_time @@ -132,5 +140,18 @@ def get_records(self, table_name, order_by, limit, start_value=None, worker_id=N # Execute the query self.cursor.execute(query) res = self.cursor.fetchall() + + if self.mysql_timezone and self.mysql_timezone != 'UTC': + tz = zoneinfo.ZoneInfo(self.mysql_timezone) + records = [] + for row in res: + new_row = [] + for value in row: + if isinstance(value, datetime.datetime) and value.tzinfo is None: + value = value.replace(tzinfo=tz) + new_row.append(value) + records.append(tuple(new_row)) + return records + records = [x for x in res] return records diff --git a/mysql_ch_replicator/runner.py b/mysql_ch_replicator/runner.py index 2c3af0a..ced9b7b 100644 --- a/mysql_ch_replicator/runner.py +++ b/mysql_ch_replicator/runner.py @@ -147,7 +147,9 @@ def check_databases_updated(self, mysql_api: MySQLApi): def run(self): mysql_api = MySQLApi( - database=None, mysql_settings=self.config.mysql, + database=None, + mysql_settings=self.config.mysql, + mysql_timezone=self.config.mysql_timezone, ) databases = mysql_api.get_databases() databases = [db for db in databases if self.config.is_database_matches(db)] diff --git a/tests/test_data_types.py b/tests/test_data_types.py index c04e36f..293a5f9 100644 --- a/tests/test_data_types.py +++ b/tests/test_data_types.py @@ -1,7 +1,9 @@ import datetime import json +import os import tempfile import uuid +import zoneinfo import yaml @@ -492,6 +494,72 @@ def test_timezone_conversion(): os.unlink(temp_config_file) +def test_timezone_conversion_values(): + """ + Test that MySQL timestamp values are correctly preserved with timezone conversion. + This test reproduces the issue from GitHub issue #177. + """ + config_file = 'tests/tests_config_timezone.yaml' + cfg = config.Settings() + cfg.load(config_file) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + mysql_timezone=cfg.mysql_timezone, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255), + created_at timestamp NULL, + updated_at timestamp(3) NULL, + PRIMARY KEY (id) + ); + ''') + + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) " + f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');", + commit=True, + ) + + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + + results = ch.select(TEST_TABLE_NAME) + assert len(results) == 1 + assert results[0]['name'] == 'test_timezone' + + created_at_value = results[0]['created_at'] + updated_at_value = results[0]['updated_at'] + + expected_dt = datetime.datetime(2023, 8, 15, 14, 30, 0) + ny_tz = zoneinfo.ZoneInfo('America/New_York') + expected_dt_with_tz = expected_dt.replace(tzinfo=ny_tz) + + assert created_at_value == expected_dt_with_tz, f"Expected {expected_dt_with_tz}, got {created_at_value}" + + expected_dt_with_microseconds = datetime.datetime(2023, 8, 15, 14, 30, 0, 123000) + expected_dt_with_microseconds_tz = expected_dt_with_microseconds.replace(tzinfo=ny_tz) + assert updated_at_value == expected_dt_with_microseconds_tz, f"Expected {expected_dt_with_microseconds_tz}, got {updated_at_value}" + + run_all_runner.stop() + + def test_year_type(): """ Test that MySQL YEAR type is properly converted to UInt16 in ClickHouse diff --git a/tests/tests_config_timezone.yaml b/tests/tests_config_timezone.yaml new file mode 100644 index 0000000..4cc08e0 --- /dev/null +++ b/tests/tests_config_timezone.yaml @@ -0,0 +1,20 @@ +mysql: + host: 'localhost' + port: 9306 + user: 'root' + password: 'admin' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + +databases: '*test*' +log_level: 'info' +mysql_timezone: 'America/New_York' +