Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ def convert_type(self, mysql_type, parameters):
return 'String'
if 'set(' in mysql_type:
return 'String'
if mysql_type == 'year':
return 'UInt16' # MySQL YEAR type can store years from 1901 to 2155, UInt16 is sufficient
raise Exception(f'unknown mysql type "{mysql_type}"')

def convert_field_type(self, mysql_type, mysql_parameters):
Expand Down Expand Up @@ -498,6 +500,18 @@ def convert_record(
field_name
)

# Handle MySQL YEAR type conversion
if mysql_field_type == 'year' and clickhouse_field_value is not None:
# MySQL YEAR type can store years from 1901 to 2155
# Convert to integer if it's a string
if isinstance(clickhouse_field_value, str):
clickhouse_field_value = int(clickhouse_field_value)
# Ensure the value is within valid range
if clickhouse_field_value < 1901:
clickhouse_field_value = 1901
elif clickhouse_field_value > 2155:
clickhouse_field_value = 2155

clickhouse_record.append(clickhouse_field_value)
return tuple(clickhouse_record)

Expand Down
88 changes: 88 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1955,3 +1955,91 @@ def test_create_table_like():
# Clean up
db_replicator_runner.stop()
binlog_replicator_runner.stop()


def test_year_type():
"""
Test that MySQL YEAR type is properly converted to UInt16 in ClickHouse
and that year values are correctly handled.
"""
config_file = CONFIG_FILE
cfg = config.Settings()
cfg.load(config_file)
mysql_config = cfg.mysql
clickhouse_config = cfg.clickhouse
mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=mysql_config
)
ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=clickhouse_config
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id INT NOT NULL AUTO_INCREMENT,
year_field YEAR NOT NULL,
nullable_year YEAR,
PRIMARY KEY (id)
)
''')

# Insert test data with various year values
mysql.execute(f'''
INSERT INTO `{TEST_TABLE_NAME}` (year_field, nullable_year) VALUES
(2024, 2024),
(1901, NULL),
(2155, 2000),
(2000, 1999);
''', commit=True)

run_all_runner = RunAllRunner(cfg_file=config_file)
run_all_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)

# Get the ClickHouse data
results = ch.select(TEST_TABLE_NAME)

# Verify the data
assert results[0]['year_field'] == 2024
assert results[0]['nullable_year'] == 2024
assert results[1]['year_field'] == 1901
assert results[1]['nullable_year'] is None
assert results[2]['year_field'] == 2155
assert results[2]['nullable_year'] == 2000
assert results[3]['year_field'] == 2000
assert results[3]['nullable_year'] == 1999

# Test realtime replication by adding more records
mysql.execute(f'''
INSERT INTO `{TEST_TABLE_NAME}` (year_field, nullable_year) VALUES
(2025, 2025),
(1999, NULL),
(2100, 2100);
''', commit=True)

# Wait for new records to be replicated
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 7)

# Verify the new records - include order by in the where clause
new_results = ch.select(TEST_TABLE_NAME, where="year_field >= 2025 ORDER BY year_field ASC")
assert len(new_results) == 3

# Check specific values
assert new_results[0]['year_field'] == 2025
assert new_results[0]['nullable_year'] == 2025
assert new_results[1]['year_field'] == 2100
assert new_results[1]['nullable_year'] == 2100
assert new_results[2]['year_field'] == 2155
assert new_results[2]['nullable_year'] == 2000

run_all_runner.stop()
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
assert('Traceback' not in read_logs(TEST_DB_NAME))