diff --git a/tests/test_data_types.py b/tests/test_data_types.py index 293a5f9..9f32352 100644 --- a/tests/test_data_types.py +++ b/tests/test_data_types.py @@ -1,12 +1,9 @@ import datetime import json import os -import tempfile import uuid import zoneinfo -import yaml - from common import * from mysql_ch_replicator import clickhouse_api from mysql_ch_replicator import config @@ -390,108 +387,77 @@ def test_timezone_conversion(): Test that MySQL timestamp fields are converted to ClickHouse DateTime64 with custom timezone. This test reproduces the issue from GitHub issue #170. """ - # Create a temporary config file with custom timezone - config_content = """ -mysql: - host: 'localhost' - port: 9306 - user: 'root' - password: 'admin' - -clickhouse: - host: 'localhost' - port: 9123 - user: 'default' - password: 'admin' - -binlog_replicator: - data_dir: '/app/binlog/' - records_per_file: 100000 - -databases: '*test*' -log_level: 'debug' -mysql_timezone: 'America/New_York' -""" - - # Create temporary config file - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: - f.write(config_content) - temp_config_file = f.name + config_file = 'tests/tests_config_timezone.yaml' - try: - cfg = config.Settings() - cfg.load(temp_config_file) - - # Verify timezone is loaded correctly - assert cfg.mysql_timezone == 'America/New_York' - - mysql = mysql_api.MySQLApi( - database=None, - mysql_settings=cfg.mysql, - ) + cfg = config.Settings() + cfg.load(config_file) + + # Verify timezone is loaded correctly + assert cfg.mysql_timezone == 'America/New_York' + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) - ch = clickhouse_api.ClickhouseApi( - database=TEST_DB_NAME, - clickhouse_settings=cfg.clickhouse, - ) + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) - prepare_env(cfg, mysql, ch) - - # Create table with timestamp fields - mysql.execute(f''' - CREATE TABLE `{TEST_TABLE_NAME}` ( - id int NOT NULL AUTO_INCREMENT, - name varchar(255), - created_at timestamp NULL, - updated_at timestamp(3) NULL, - PRIMARY KEY (id) - ); - ''') + prepare_env(cfg, mysql, ch) - # Insert test data with specific timestamp - mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) " - f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');", - commit=True, - ) + # Create table with timestamp fields + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255), + created_at timestamp NULL, + updated_at timestamp(3) NULL, + PRIMARY KEY (id) + ); + ''') - # Run replication - run_all_runner = RunAllRunner(cfg_file=temp_config_file) - run_all_runner.run() + # Insert test data with specific timestamp + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) " + f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');", + commit=True, + ) - assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE `{TEST_DB_NAME}`') - assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + # Run replication + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() - # Get the table structure from ClickHouse - table_info = ch.query(f'DESCRIBE `{TEST_TABLE_NAME}`') - - # Check that timestamp fields are converted to DateTime64 with timezone - created_at_type = None - updated_at_type = None - for row in table_info.result_rows: - if row[0] == 'created_at': - created_at_type = row[1] - elif row[0] == 'updated_at': - updated_at_type = row[1] - - # Verify the types include the timezone - assert created_at_type is not None - assert updated_at_type is not None - assert 'America/New_York' in created_at_type - assert 'America/New_York' in updated_at_type - - # Verify data was inserted correctly - results = ch.select(TEST_TABLE_NAME) - assert len(results) == 1 - assert results[0]['name'] == 'test_timezone' - - run_all_runner.stop() - - finally: - # Clean up temporary config file - os.unlink(temp_config_file) + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + + # Get the table structure from ClickHouse + table_info = ch.query(f'DESCRIBE `{TEST_TABLE_NAME}`') + + # Check that timestamp fields are converted to DateTime64 with timezone + created_at_type = None + updated_at_type = None + for row in table_info.result_rows: + if row[0] == 'created_at': + created_at_type = row[1] + elif row[0] == 'updated_at': + updated_at_type = row[1] + + # Verify the types include the timezone + assert created_at_type is not None + assert updated_at_type is not None + assert 'America/New_York' in created_at_type + assert 'America/New_York' in updated_at_type + + # Verify data was inserted correctly + results = ch.select(TEST_TABLE_NAME) + assert len(results) == 1 + assert results[0]['name'] == 'test_timezone' + + run_all_runner.stop() def test_timezone_conversion_values(): @@ -908,115 +874,99 @@ def test_charset_configuration(): This test verifies that utf8mb4 charset can be configured to properly handle 4-byte Unicode characters in JSON fields. """ - # Create a temporary config file with explicit charset configuration - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file: - config_file = temp_config_file.name - - # Load base config and add charset setting - with open(CONFIG_FILE, 'r') as f: - base_config = yaml.safe_load(f) - - # Ensure charset is set to utf8mb4 - base_config['mysql']['charset'] = 'utf8mb4' - - yaml.dump(base_config, temp_config_file) + config_file = 'tests/tests_config_charset.yaml' + + cfg = config.Settings() + cfg.load(config_file) + + # Verify charset is loaded correctly + assert hasattr(cfg.mysql, 'charset'), "MysqlSettings should have charset attribute" + assert cfg.mysql.charset == 'utf8mb4', f"Expected charset utf8mb4, got {cfg.mysql.charset}" + + mysql = mysql_api.MySQLApi(None, cfg.mysql) + ch = clickhouse_api.ClickhouseApi(None, cfg.clickhouse) + + prepare_env(cfg, mysql, ch) + + mysql.database = TEST_DB_NAME + ch.database = TEST_DB_NAME + + # Create table with JSON field + mysql.execute(f""" + CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} ( + id INT AUTO_INCREMENT PRIMARY KEY, + json_data JSON + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci + """, commit=True) + + # Insert data with 4-byte Unicode characters (emoji and Arabic text) + test_data = { + "ar": "مرحباً بالعالم", # Arabic: Hello World + "emoji": "🌍🎉✨", + "cn": "你好世界", # Chinese: Hello World + "en": "Hello World" + } + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)", + args=(json.dumps(test_data, ensure_ascii=False),), + commit=True + ) + + # Verify the data can be read back correctly + mysql.cursor.execute(f"SELECT json_data FROM {TEST_TABLE_NAME}") + result = mysql.cursor.fetchone() + assert result is not None, "Should have retrieved a record" + + retrieved_data = json.loads(result[0]) if isinstance(result[0], str) else result[0] + assert retrieved_data['ar'] == test_data['ar'], f"Arabic text mismatch: {retrieved_data['ar']} != {test_data['ar']}" + assert retrieved_data['emoji'] == test_data['emoji'], f"Emoji mismatch: {retrieved_data['emoji']} != {test_data['emoji']}" + assert retrieved_data['cn'] == test_data['cn'], f"Chinese text mismatch: {retrieved_data['cn']} != {test_data['cn']}" + + # Test binlog replication with charset + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) + binlog_replicator_runner.run() try: - cfg = config.Settings() - cfg.load(config_file) - - # Verify charset is loaded correctly - assert hasattr(cfg.mysql, 'charset'), "MysqlSettings should have charset attribute" - assert cfg.mysql.charset == 'utf8mb4', f"Expected charset utf8mb4, got {cfg.mysql.charset}" + # Start db replicator + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) + db_replicator_runner.run() - mysql = mysql_api.MySQLApi(None, cfg.mysql) - ch = clickhouse_api.ClickhouseApi(None, cfg.clickhouse) + # Wait for database and table to be created in ClickHouse + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=20) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=20) - prepare_env(cfg, mysql, ch) + # Wait for replication + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, max_wait_time=20) - mysql.database = TEST_DB_NAME - ch.database = TEST_DB_NAME + # Verify data in ClickHouse + ch_records = ch.select(TEST_TABLE_NAME) + assert len(ch_records) == 1, f"Expected 1 record in ClickHouse, got {len(ch_records)}" - # Create table with JSON field - mysql.execute(f""" - CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} ( - id INT AUTO_INCREMENT PRIMARY KEY, - json_data JSON - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci - """, commit=True) + # Access the json_data column using dictionary access + ch_record = ch_records[0] + ch_json_data = json.loads(ch_record['json_data']) if isinstance(ch_record['json_data'], str) else ch_record['json_data'] - # Insert data with 4-byte Unicode characters (emoji and Arabic text) - test_data = { - "ar": "مرحباً بالعالم", # Arabic: Hello World - "emoji": "🌍🎉✨", - "cn": "你好世界", # Chinese: Hello World - "en": "Hello World" - } + # Verify Unicode characters are preserved correctly + assert ch_json_data['ar'] == test_data['ar'], f"Arabic text not preserved in CH: {ch_json_data.get('ar')}" + assert ch_json_data['emoji'] == test_data['emoji'], f"Emoji not preserved in CH: {ch_json_data.get('emoji')}" + assert ch_json_data['cn'] == test_data['cn'], f"Chinese text not preserved in CH: {ch_json_data.get('cn')}" + # Test realtime replication with more Unicode data + more_data = {"test": "🔥 Real-time 测试 اختبار"} mysql.execute( f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)", - args=(json.dumps(test_data, ensure_ascii=False),), + args=(json.dumps(more_data, ensure_ascii=False),), commit=True ) - # Verify the data can be read back correctly - mysql.cursor.execute(f"SELECT json_data FROM {TEST_TABLE_NAME}") - result = mysql.cursor.fetchone() - assert result is not None, "Should have retrieved a record" - - retrieved_data = json.loads(result[0]) if isinstance(result[0], str) else result[0] - assert retrieved_data['ar'] == test_data['ar'], f"Arabic text mismatch: {retrieved_data['ar']} != {test_data['ar']}" - assert retrieved_data['emoji'] == test_data['emoji'], f"Emoji mismatch: {retrieved_data['emoji']} != {test_data['emoji']}" - assert retrieved_data['cn'] == test_data['cn'], f"Chinese text mismatch: {retrieved_data['cn']} != {test_data['cn']}" + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2, max_wait_time=20) - # Test binlog replication with charset - binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) - binlog_replicator_runner.run() + # Verify the second record + ch_records = ch.select(TEST_TABLE_NAME) + assert len(ch_records) == 2, f"Expected 2 records in ClickHouse, got {len(ch_records)}" - try: - # Start db replicator - db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) - db_replicator_runner.run() - - # Wait for database and table to be created in ClickHouse - assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=20) - ch.execute_command(f'USE `{TEST_DB_NAME}`') - assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=20) - - # Wait for replication - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, max_wait_time=20) - - # Verify data in ClickHouse - ch_records = ch.select(TEST_TABLE_NAME) - assert len(ch_records) == 1, f"Expected 1 record in ClickHouse, got {len(ch_records)}" - - # Access the json_data column using dictionary access - ch_record = ch_records[0] - ch_json_data = json.loads(ch_record['json_data']) if isinstance(ch_record['json_data'], str) else ch_record['json_data'] - - # Verify Unicode characters are preserved correctly - assert ch_json_data['ar'] == test_data['ar'], f"Arabic text not preserved in CH: {ch_json_data.get('ar')}" - assert ch_json_data['emoji'] == test_data['emoji'], f"Emoji not preserved in CH: {ch_json_data.get('emoji')}" - assert ch_json_data['cn'] == test_data['cn'], f"Chinese text not preserved in CH: {ch_json_data.get('cn')}" - - # Test realtime replication with more Unicode data - more_data = {"test": "🔥 Real-time 测试 اختبار"} - mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)", - args=(json.dumps(more_data, ensure_ascii=False),), - commit=True - ) - - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2, max_wait_time=20) - - # Verify the second record - ch_records = ch.select(TEST_TABLE_NAME) - assert len(ch_records) == 2, f"Expected 2 records in ClickHouse, got {len(ch_records)}" - - db_replicator_runner.stop() - finally: - binlog_replicator_runner.stop() - + db_replicator_runner.stop() finally: - # Clean up temp config file - os.unlink(config_file) + binlog_replicator_runner.stop() diff --git a/tests/test_mysql_ch_replicator.py b/tests/test_mysql_ch_replicator.py index df7e16b..b49f746 100644 --- a/tests/test_mysql_ch_replicator.py +++ b/tests/test_mysql_ch_replicator.py @@ -6,8 +6,6 @@ import json import uuid import decimal -import tempfile -import yaml import pytest import requests @@ -560,145 +558,129 @@ def test_parse_db_name_from_query(query, expected): def test_ignore_deletes(): - # Create a temporary config file with ignore_deletes=True - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file: - config_file = temp_config_file.name - - # Read the original config - with open(CONFIG_FILE, 'r') as original_config: - config_data = yaml.safe_load(original_config) - - # Add ignore_deletes=True - config_data['ignore_deletes'] = True - - # Write to the temp file - yaml.dump(config_data, temp_config_file) - - try: - cfg = config.Settings() - cfg.load(config_file) - - # Verify the ignore_deletes option was set - assert cfg.ignore_deletes is True + config_file = 'tests/tests_config_ignore_deletes.yaml' + + cfg = config.Settings() + cfg.load(config_file) + + # Verify the ignore_deletes option was set + assert cfg.ignore_deletes is True - mysql = mysql_api.MySQLApi( - database=None, - mysql_settings=cfg.mysql, - ) + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) - ch = clickhouse_api.ClickhouseApi( - database=TEST_DB_NAME, - clickhouse_settings=cfg.clickhouse, - ) + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) - prepare_env(cfg, mysql, ch) + prepare_env(cfg, mysql, ch) - # Create a table with a composite primary key - mysql.execute(f''' - CREATE TABLE `{TEST_TABLE_NAME}` ( - departments int(11) NOT NULL, - termine int(11) NOT NULL, - data varchar(255) NOT NULL, - PRIMARY KEY (departments,termine) - ) - ''') + # Create a table with a composite primary key + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + departments int(11) NOT NULL, + termine int(11) NOT NULL, + data varchar(255) NOT NULL, + PRIMARY KEY (departments,termine) + ) + ''') - # Insert initial records - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (10, 20, 'data1');", commit=True) - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (30, 40, 'data2');", commit=True) - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (50, 60, 'data3');", commit=True) + # Insert initial records + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (10, 20, 'data1');", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (30, 40, 'data2');", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (50, 60, 'data3');", commit=True) - # Run the replicator with ignore_deletes=True - run_all_runner = RunAllRunner(cfg_file=config_file) - run_all_runner.run() + # Run the replicator with ignore_deletes=True + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() - # Wait for replication to complete - assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE `{TEST_DB_NAME}`') - assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + # Wait for replication to complete + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) - # Delete some records from MySQL - mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=10;", commit=True) - mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=30;", commit=True) - - # Wait a moment to ensure replication processes the events - time.sleep(5) - - # Verify records are NOT deleted in ClickHouse (since ignore_deletes=True) - # The count should still be 3 - assert len(ch.select(TEST_TABLE_NAME)) == 3, "Deletions were processed despite ignore_deletes=True" - - # Insert a new record and verify it's added - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (70, 80, 'data4');", commit=True) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) - - # Verify the new record is correctly added - result = ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80") - assert len(result) == 1 - assert result[0]['data'] == 'data4' - - # Clean up - run_all_runner.stop() - - # Verify no errors occurred - assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) - assert('Traceback' not in read_logs(TEST_DB_NAME)) - - # Additional tests for persistence after restart - - # 1. Remove all entries from table in MySQL - mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE 1=1;", commit=True) + # Delete some records from MySQL + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=10;", commit=True) + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=30;", commit=True) + + # Wait a moment to ensure replication processes the events + time.sleep(5) + + # Verify records are NOT deleted in ClickHouse (since ignore_deletes=True) + # The count should still be 3 + assert len(ch.select(TEST_TABLE_NAME)) == 3, "Deletions were processed despite ignore_deletes=True" + + # Insert a new record and verify it's added + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (70, 80, 'data4');", commit=True) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) + + # Verify the new record is correctly added + result = ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80") + assert len(result) == 1 + assert result[0]['data'] == 'data4' + + # Clean up + run_all_runner.stop() + + # Verify no errors occurred + assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) + assert('Traceback' not in read_logs(TEST_DB_NAME)) + + # Additional tests for persistence after restart + + # 1. Remove all entries from table in MySQL + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE 1=1;", commit=True) - # Add a new row in MySQL before starting the replicator - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (110, 120, 'offline_data');", commit=True) - - # 2. Wait 5 seconds - time.sleep(5) - - # 3. Remove binlog directory (similar to prepare_env, but without removing tables) - if os.path.exists(cfg.binlog_replicator.data_dir): - shutil.rmtree(cfg.binlog_replicator.data_dir) - os.mkdir(cfg.binlog_replicator.data_dir) - + # Add a new row in MySQL before starting the replicator + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (110, 120, 'offline_data');", commit=True) + + # 2. Wait 5 seconds + time.sleep(5) + + # 3. Remove binlog directory (similar to prepare_env, but without removing tables) + if os.path.exists(cfg.binlog_replicator.data_dir): + shutil.rmtree(cfg.binlog_replicator.data_dir) + os.mkdir(cfg.binlog_replicator.data_dir) + - # 4. Create and run a new runner - new_runner = RunAllRunner(cfg_file=config_file) - new_runner.run() - - # 5. Ensure it has all the previous data (should still be 4 records from before + 1 new offline record) - assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE `{TEST_DB_NAME}`') - assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5) - - # Verify we still have all the old data - assert len(ch.select(TEST_TABLE_NAME, where="departments=10 AND termine=20")) == 1 - assert len(ch.select(TEST_TABLE_NAME, where="departments=30 AND termine=40")) == 1 - assert len(ch.select(TEST_TABLE_NAME, where="departments=50 AND termine=60")) == 1 - assert len(ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80")) == 1 - - # Verify the offline data was replicated - assert len(ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")) == 1 - offline_data = ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")[0] - assert offline_data['data'] == 'offline_data' - - # 6. Insert new data and verify it gets added to existing data - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (90, 100, 'data5');", commit=True) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6) - - # Verify the combined old and new data - result = ch.select(TEST_TABLE_NAME, where="departments=90 AND termine=100") - assert len(result) == 1 - assert result[0]['data'] == 'data5' - - # Make sure we have all 6 records (4 original + 1 offline + 1 new one) - assert len(ch.select(TEST_TABLE_NAME)) == 6 - - new_runner.stop() - finally: - # Clean up the temporary config file - os.unlink(config_file) + # 4. Create and run a new runner + new_runner = RunAllRunner(cfg_file=config_file) + new_runner.run() + + # 5. Ensure it has all the previous data (should still be 4 records from before + 1 new offline record) + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5) + + # Verify we still have all the old data + assert len(ch.select(TEST_TABLE_NAME, where="departments=10 AND termine=20")) == 1 + assert len(ch.select(TEST_TABLE_NAME, where="departments=30 AND termine=40")) == 1 + assert len(ch.select(TEST_TABLE_NAME, where="departments=50 AND termine=60")) == 1 + assert len(ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80")) == 1 + + # Verify the offline data was replicated + assert len(ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")) == 1 + offline_data = ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")[0] + assert offline_data['data'] == 'offline_data' + + # 6. Insert new data and verify it gets added to existing data + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (90, 100, 'data5');", commit=True) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6) + + # Verify the combined old and new data + result = ch.select(TEST_TABLE_NAME, where="departments=90 AND termine=100") + assert len(result) == 1 + assert result[0]['data'] == 'data5' + + # Make sure we have all 6 records (4 original + 1 offline + 1 new one) + assert len(ch.select(TEST_TABLE_NAME)) == 6 + + new_runner.stop() def test_issue_160_unknown_mysql_type_bug(): """ @@ -845,117 +827,97 @@ def test_resume_initial_replication_with_ignore_deletes(): when ignore_deletes=True because the code would try to use the _tmp database instead of the target database directly. """ - # Create a temporary config file with ignore_deletes=True - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file: - config_file = temp_config_file.name - - # Read the original config - with open(CONFIG_FILE, 'r') as original_config: - config_data = yaml.safe_load(original_config) - - # Add ignore_deletes=True - config_data['ignore_deletes'] = True - - # Set initial_replication_batch_size to 1 for testing - config_data['initial_replication_batch_size'] = 1 - - # Write to the temp file - yaml.dump(config_data, temp_config_file) + config_file = 'tests/tests_config_ignore_deletes.yaml' + + cfg = config.Settings() + cfg.load(config_file) + + # Verify the ignore_deletes option was set + assert cfg.ignore_deletes is True - try: - cfg = config.Settings() - cfg.load(config_file) - - # Verify the ignore_deletes option was set - assert cfg.ignore_deletes is True + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) - mysql = mysql_api.MySQLApi( - database=None, - mysql_settings=cfg.mysql, - ) + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) - ch = clickhouse_api.ClickhouseApi( - database=TEST_DB_NAME, - clickhouse_settings=cfg.clickhouse, - ) + prepare_env(cfg, mysql, ch) - prepare_env(cfg, mysql, ch) + # Create a table with many records to ensure initial replication takes time + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255), + data varchar(1000), + PRIMARY KEY (id) + ) + ''') - # Create a table with many records to ensure initial replication takes time - mysql.execute(f''' - CREATE TABLE `{TEST_TABLE_NAME}` ( - id int NOT NULL AUTO_INCREMENT, - name varchar(255), - data varchar(1000), - PRIMARY KEY (id) + # Insert many records to make initial replication take longer + for i in range(100): + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('test_{i}', 'data_{i}');", + commit=True ) - ''') - - # Insert many records to make initial replication take longer - for i in range(100): - mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('test_{i}', 'data_{i}');", - commit=True - ) - # Start binlog replicator - binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) - binlog_replicator_runner.run() + # Start binlog replicator + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) + binlog_replicator_runner.run() - # Start db replicator for initial replication with test flag to exit early - db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file, - additional_arguments='--initial-replication-test-fail-records 30') - db_replicator_runner.run() - - # Wait for initial replication to start - assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE `{TEST_DB_NAME}`') - assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) - - # Wait for some records to be replicated but not all (should hit the 30 record limit) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) > 0) - - # The db replicator should have stopped automatically due to the test flag - # But we still call stop() to ensure proper cleanup - db_replicator_runner.stop() - - # Verify the state is still PERFORMING_INITIAL_REPLICATION - state_path = os.path.join(cfg.binlog_replicator.data_dir, TEST_DB_NAME, 'state.pckl') - state = DbReplicatorState(state_path) - assert state.status.value == 2 # PERFORMING_INITIAL_REPLICATION - - # Add more records while replication is stopped - for i in range(100, 150): - mysql.execute( - f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('test_{i}', 'data_{i}');", - commit=True - ) + # Start db replicator for initial replication with test flag to exit early + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file, + additional_arguments='--initial-replication-test-fail-records 30') + db_replicator_runner.run() + + # Wait for initial replication to start + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + + # Wait for some records to be replicated but not all (should hit the 30 record limit) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) > 0) + + # The db replicator should have stopped automatically due to the test flag + # But we still call stop() to ensure proper cleanup + db_replicator_runner.stop() + + # Verify the state is still PERFORMING_INITIAL_REPLICATION + state_path = os.path.join(cfg.binlog_replicator.data_dir, TEST_DB_NAME, 'state.pckl') + state = DbReplicatorState(state_path) + assert state.status.value == 2 # PERFORMING_INITIAL_REPLICATION + + # Add more records while replication is stopped + for i in range(100, 150): + mysql.execute( + f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('test_{i}', 'data_{i}');", + commit=True + ) - # Verify that sirocco_tmp database does NOT exist (it should use sirocco directly) - assert f"{TEST_DB_NAME}_tmp" not in ch.get_databases(), "Temporary database should not exist with ignore_deletes=True" - - # Resume initial replication - this should NOT fail with "Database sirocco_tmp does not exist" - db_replicator_runner_2 = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) - db_replicator_runner_2.run() - - # Wait for all records to be replicated (100 original + 50 extra = 150) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 150, max_wait_time=30) - - # Verify the replication completed successfully - records = ch.select(TEST_TABLE_NAME) - assert len(records) == 150, f"Expected 150 records, got {len(records)}" - - # Verify we can continue with realtime replication - mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('realtime_test', 'realtime_data');", commit=True) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 151) - - # Clean up - db_replicator_runner_2.stop() - binlog_replicator_runner.stop() - - finally: - # Clean up temp config file - os.unlink(config_file) + # Verify that sirocco_tmp database does NOT exist (it should use sirocco directly) + assert f"{TEST_DB_NAME}_tmp" not in ch.get_databases(), "Temporary database should not exist with ignore_deletes=True" + + # Resume initial replication - this should NOT fail with "Database sirocco_tmp does not exist" + db_replicator_runner_2 = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) + db_replicator_runner_2.run() + + # Wait for all records to be replicated (100 original + 50 extra = 150) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 150, max_wait_time=30) + + # Verify the replication completed successfully + records = ch.select(TEST_TABLE_NAME) + assert len(records) == 150, f"Expected 150 records, got {len(records)}" + + # Verify we can continue with realtime replication + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('realtime_test', 'realtime_data');", commit=True) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 151) + + # Clean up + db_replicator_runner_2.stop() + binlog_replicator_runner.stop() def test_post_initial_replication_commands(): diff --git a/tests/test_performance.py b/tests/test_performance.py index cabe51a..9bc6a46 100644 --- a/tests/test_performance.py +++ b/tests/test_performance.py @@ -256,16 +256,8 @@ def test_performance_initial_only_replication(): t1 = time.time() - # Create a custom config file for testing with parallel replication + # Use the static parallel config file parallel_config_file = 'tests/tests_config_perf_parallel.yaml' - if os.path.exists(parallel_config_file): - os.remove(parallel_config_file) - - with open(config_file, 'r') as src_file: - config_content = src_file.read() - config_content += f"\ninitial_replication_threads: 8\n" - with open(parallel_config_file, 'w') as dest_file: - dest_file.write(config_content) # Use the DbReplicator directly to test the new parallel implementation db_replicator_runner = DbReplicatorRunner( @@ -297,6 +289,3 @@ def test_performance_initial_only_replication(): print('\n\n') db_replicator_runner.stop() - - # Clean up the temporary config file - os.remove(parallel_config_file) diff --git a/tests/tests_config_charset.yaml b/tests/tests_config_charset.yaml new file mode 100644 index 0000000..851ed6a --- /dev/null +++ b/tests/tests_config_charset.yaml @@ -0,0 +1,38 @@ +mysql: + host: 'localhost' + port: 9306 + user: 'root' + password: 'admin' + charset: 'utf8mb4' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + erase_batch_size: 2 + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + binlog_retention_period: 43200 # 12 hours in seconds + +databases: '*test*' +log_level: 'info' +optimize_interval: 3 +check_db_updated_interval: 3 + +target_databases: + replication-test_db_2: replication-destination + +indexes: + - databases: '*' + tables: ['group'] + index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' + +http_host: 'localhost' +http_port: 9128 + +types_mapping: + 'char(36)': 'UUID' + diff --git a/tests/tests_config_ignore_deletes.yaml b/tests/tests_config_ignore_deletes.yaml new file mode 100644 index 0000000..67538b7 --- /dev/null +++ b/tests/tests_config_ignore_deletes.yaml @@ -0,0 +1,39 @@ +mysql: + host: 'localhost' + port: 9306 + user: 'root' + password: 'admin' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + erase_batch_size: 2 + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + binlog_retention_period: 43200 # 12 hours in seconds + +databases: '*test*' +log_level: 'info' +optimize_interval: 3 +check_db_updated_interval: 3 +ignore_deletes: true +initial_replication_batch_size: 1 + +target_databases: + replication-test_db_2: replication-destination + +indexes: + - databases: '*' + tables: ['group'] + index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' + +http_host: 'localhost' +http_port: 9128 + +types_mapping: + 'char(36)': 'UUID' + diff --git a/tests/tests_config_perf_parallel.yaml b/tests/tests_config_perf_parallel.yaml new file mode 100644 index 0000000..6418ef8 --- /dev/null +++ b/tests/tests_config_perf_parallel.yaml @@ -0,0 +1,38 @@ +mysql: + host: 'localhost' + port: 9306 + user: 'root' + password: 'admin' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + erase_batch_size: 2 + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + binlog_retention_period: 43200 # 12 hours in seconds + +databases: '*test*' +log_level: 'info' +optimize_interval: 3 +check_db_updated_interval: 3 +initial_replication_threads: 8 + +target_databases: + replication-test_db_2: replication-destination + +indexes: + - databases: '*' + tables: ['group'] + index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1' + +http_host: 'localhost' +http_port: 9128 + +types_mapping: + 'char(36)': 'UUID' +