Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 117 additions & 2 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,7 @@ def get_last_insert_from_binlog(cfg: config.Settings, db_name: str):


@pytest.mark.optional
def test_performance_dbreplicator():
def test_performance_realtime_replication():
config_file = 'tests_config_perf.yaml'
num_records = 100000

Expand Down Expand Up @@ -1387,6 +1387,8 @@ def test_performance_dbreplicator():

binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

time.sleep(1)

Expand All @@ -1399,8 +1401,15 @@ def _get_last_insert_name():
return record[1].decode('utf-8')

assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5)

# Wait for the database and table to be created in ClickHouse
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, retry_interval=0.5)

binlog_replicator_runner.stop()
db_replicator_runner.stop()

time.sleep(1)

Expand All @@ -1418,7 +1427,7 @@ def _get_last_insert_name():

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)

print("running db_replicator")
print("running binlog_replicator")
t1 = time.time()
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()
Expand All @@ -1433,6 +1442,33 @@ def _get_last_insert_name():

print('\n\n')
print("*****************************")
print("Binlog Replicator Performance:")
print("records per second:", int(rps))
print("total time (seconds):", round(time_delta, 2))
print("*****************************")
print('\n\n')

# Now test db_replicator performance
print("running db_replicator")
t1 = time.time()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

# Make sure the database and table exist before querying
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 2, retry_interval=0.5, max_wait_time=1000)
t2 = time.time()

db_replicator_runner.stop()

time_delta = t2 - t1
rps = num_records / time_delta

print('\n\n')
print("*****************************")
print("DB Replicator Performance:")
print("records per second:", int(rps))
print("total time (seconds):", round(time_delta, 2))
print("*****************************")
Expand Down Expand Up @@ -1961,3 +1997,82 @@ def test_year_type():
run_all_runner.stop()
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
assert('Traceback' not in read_logs(TEST_DB_NAME))

@pytest.mark.optional
def test_performance_initial_only_replication():
config_file = 'tests_config_perf.yaml'
num_records = 1000000

cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(2048),
age int,
PRIMARY KEY (id)
);
''')

print("populating mysql data")

base_value = 'a' * 2000

for i in range(num_records):
if i % 2000 == 0:
print(f'populated {i} elements')
mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
)

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)
print(f"finished populating {num_records} records")

# Now test db_replicator performance in initial_only mode
print("running db_replicator in initial_only mode")
t1 = time.time()

db_replicator_runner = DbReplicatorRunner(
TEST_DB_NAME,
additional_arguments='--initial_only=True',
cfg_file=config_file
)
db_replicator_runner.run()
db_replicator_runner.wait_complete() # Wait for the process to complete

# Make sure the database and table exist
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)

# Check that all records were replicated
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 1, retry_interval=0.5, max_wait_time=300)

t2 = time.time()

time_delta = t2 - t1
rps = num_records / time_delta

print('\n\n')
print("*****************************")
print("DB Replicator Initial Only Mode Performance:")
print("records per second:", int(rps))
print("total time (seconds):", round(time_delta, 2))
print("*****************************")
print('\n\n')

db_replicator_runner.stop()