Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def perform_initial_replication_table(self, table_name):
while True:

query_start_value = max_primary_key
if 'Int' not in primary_key_type:
if 'int' not in primary_key_type.lower() and query_start_value is not None:
query_start_value = f"'{query_start_value}'"

records = self.mysql_api.get_records(
Expand Down
61 changes: 60 additions & 1 deletion test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from mysql_ch_replicator import mysql_api
from mysql_ch_replicator import clickhouse_api
from mysql_ch_replicator.binlog_replicator import State as BinlogState
from mysql_ch_replicator.db_replicator import State as DbReplicatorState
from mysql_ch_replicator.db_replicator import State as DbReplicatorState, DbReplicator

from mysql_ch_replicator.runner import ProcessRunner

Expand Down Expand Up @@ -704,3 +704,62 @@ def test_json():

assert json.loads(ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['data'])['c'] == [1, 2, 3]
assert json.loads(ch.select(TEST_TABLE_NAME, "name='Peter'")[0]['data'])['c'] == [3, 2, 1]


def test_string_primary_key(monkeypatch):
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)

cfg = config.Settings()
cfg.load(CONFIG_FILE)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';")

mysql.execute(f'''
CREATE TABLE {TEST_TABLE_NAME} (
`id` char(30) NOT NULL,
name varchar(255),
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " +
"""('01', 'Ivan');""",
commit=True,
)
mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " +
"""('02', 'Peter');""",
commit=True,
)

binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())

ch.execute_command(f'USE {TEST_DB_NAME}')

assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " +
"""('03', 'Filipp');""",
commit=True,
)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
Loading