diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 4cd22d0..3787a3b 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -267,7 +267,7 @@ def perform_initial_replication_table(self, table_name): while True: query_start_value = max_primary_key - if 'Int' not in primary_key_type: + if 'int' not in primary_key_type.lower() and query_start_value is not None: query_start_value = f"'{query_start_value}'" records = self.mysql_api.get_records( diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index c6fed82..d083340 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -8,7 +8,7 @@ from mysql_ch_replicator import mysql_api from mysql_ch_replicator import clickhouse_api from mysql_ch_replicator.binlog_replicator import State as BinlogState -from mysql_ch_replicator.db_replicator import State as DbReplicatorState +from mysql_ch_replicator.db_replicator import State as DbReplicatorState, DbReplicator from mysql_ch_replicator.runner import ProcessRunner @@ -704,3 +704,62 @@ def test_json(): assert json.loads(ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['data'])['c'] == [1, 2, 3] assert json.loads(ch.select(TEST_TABLE_NAME, "name='Peter'")[0]['data'])['c'] == [3, 2, 1] + + +def test_string_primary_key(monkeypatch): + monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1) + + cfg = config.Settings() + cfg.load(CONFIG_FILE) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") + + mysql.execute(f''' +CREATE TABLE {TEST_TABLE_NAME} ( + `id` char(30) NOT NULL, + name varchar(255), + PRIMARY KEY (id) +); + ''') + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + + """('01', 'Ivan');""", + commit=True, + ) + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + + """('02', 'Peter');""", + commit=True, + ) + + binlog_replicator_runner = BinlogReplicatorRunner() + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) + db_replicator_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + + ch.execute_command(f'USE {TEST_DB_NAME}') + + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + + """('03', 'Filipp');""", + commit=True, + ) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)