From 445bab63e49369c21f1dec9c2889e0e4f3a327f7 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Wed, 26 Mar 2025 21:41:26 +0400 Subject: [PATCH] Support for year type --- mysql_ch_replicator/converter.py | 14 +++++ test_mysql_ch_replicator.py | 88 ++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 2efd201..7ba5381 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -381,6 +381,8 @@ def convert_type(self, mysql_type, parameters): return 'String' if 'set(' in mysql_type: return 'String' + if mysql_type == 'year': + return 'UInt16' # MySQL YEAR type can store years from 1901 to 2155, UInt16 is sufficient raise Exception(f'unknown mysql type "{mysql_type}"') def convert_field_type(self, mysql_type, mysql_parameters): @@ -498,6 +500,18 @@ def convert_record( field_name ) + # Handle MySQL YEAR type conversion + if mysql_field_type == 'year' and clickhouse_field_value is not None: + # MySQL YEAR type can store years from 1901 to 2155 + # Convert to integer if it's a string + if isinstance(clickhouse_field_value, str): + clickhouse_field_value = int(clickhouse_field_value) + # Ensure the value is within valid range + if clickhouse_field_value < 1901: + clickhouse_field_value = 1901 + elif clickhouse_field_value > 2155: + clickhouse_field_value = 2155 + clickhouse_record.append(clickhouse_field_value) return tuple(clickhouse_record) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 44bc65c..27d13d6 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1955,3 +1955,91 @@ def test_create_table_like(): # Clean up db_replicator_runner.stop() binlog_replicator_runner.stop() + + +def test_year_type(): + """ + Test that MySQL YEAR type is properly converted to UInt16 in ClickHouse + and that year values are correctly handled. + """ + config_file = CONFIG_FILE + cfg = config.Settings() + cfg.load(config_file) + mysql_config = cfg.mysql + clickhouse_config = cfg.clickhouse + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=mysql_config + ) + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=clickhouse_config + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id INT NOT NULL AUTO_INCREMENT, + year_field YEAR NOT NULL, + nullable_year YEAR, + PRIMARY KEY (id) + ) + ''') + + # Insert test data with various year values + mysql.execute(f''' + INSERT INTO `{TEST_TABLE_NAME}` (year_field, nullable_year) VALUES + (2024, 2024), + (1901, NULL), + (2155, 2000), + (2000, 1999); + ''', commit=True) + + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) + + # Get the ClickHouse data + results = ch.select(TEST_TABLE_NAME) + + # Verify the data + assert results[0]['year_field'] == 2024 + assert results[0]['nullable_year'] == 2024 + assert results[1]['year_field'] == 1901 + assert results[1]['nullable_year'] is None + assert results[2]['year_field'] == 2155 + assert results[2]['nullable_year'] == 2000 + assert results[3]['year_field'] == 2000 + assert results[3]['nullable_year'] == 1999 + + # Test realtime replication by adding more records + mysql.execute(f''' + INSERT INTO `{TEST_TABLE_NAME}` (year_field, nullable_year) VALUES + (2025, 2025), + (1999, NULL), + (2100, 2100); + ''', commit=True) + + # Wait for new records to be replicated + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 7) + + # Verify the new records - include order by in the where clause + new_results = ch.select(TEST_TABLE_NAME, where="year_field >= 2025 ORDER BY year_field ASC") + assert len(new_results) == 3 + + # Check specific values + assert new_results[0]['year_field'] == 2025 + assert new_results[0]['nullable_year'] == 2025 + assert new_results[1]['year_field'] == 2100 + assert new_results[1]['nullable_year'] == 2100 + assert new_results[2]['year_field'] == 2155 + assert new_results[2]['nullable_year'] == 2000 + + run_all_runner.stop() + assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) + assert('Traceback' not in read_logs(TEST_DB_NAME))