Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 0 additions & 292 deletions tests/test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,166 +420,6 @@ def test_parse_mysql_table_structure():
assert structure.table_name == 'user_preferences_portal'


def get_last_file(directory, extension='.bin'):
max_num = -1
last_file = None
ext_len = len(extension)

with os.scandir(directory) as it:
for entry in it:
if entry.is_file() and entry.name.endswith(extension):
# Extract the numerical part by removing the extension
num_part = entry.name[:-ext_len]
try:
num = int(num_part)
if num > max_num:
max_num = num
last_file = entry.name
except ValueError:
# Skip files where the name before extension is not an integer
continue
return last_file


def get_last_insert_from_binlog(cfg: config.Settings, db_name: str):
binlog_dir_path = os.path.join(cfg.binlog_replicator.data_dir, db_name)
if not os.path.exists(binlog_dir_path):
return None
last_file = get_last_file(binlog_dir_path)
if last_file is None:
return None
reader = FileReader(os.path.join(binlog_dir_path, last_file))
last_insert = None
while True:
event = reader.read_next_event()
if event is None:
break
if event.event_type != EventType.ADD_EVENT.value:
continue
for record in event.records:
last_insert = record
return last_insert


@pytest.mark.optional
def test_performance_realtime_replication():
config_file = 'tests/tests_config_perf.yaml'
num_records = 100000

cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(2048),
age int,
PRIMARY KEY (id)
);
''')

binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

time.sleep(1)

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True)

def _get_last_insert_name():
record = get_last_insert_from_binlog(cfg=cfg, db_name=TEST_DB_NAME)
if record is None:
return None
return record[1].decode('utf-8')

assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5)

# Wait for the database and table to be created in ClickHouse
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, retry_interval=0.5)

binlog_replicator_runner.stop()
db_replicator_runner.stop()

time.sleep(1)

print("populating mysql data")

base_value = 'a' * 2000

for i in range(num_records):
if i % 2000 == 0:
print(f'populated {i} elements')
mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
)

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)

print("running binlog_replicator")
t1 = time.time()
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()

assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_FINAL', retry_interval=0.5, max_wait_time=1000)
t2 = time.time()

binlog_replicator_runner.stop()

time_delta = t2 - t1
rps = num_records / time_delta

print('\n\n')
print("*****************************")
print("Binlog Replicator Performance:")
print("records per second:", int(rps))
print("total time (seconds):", round(time_delta, 2))
print("*****************************")
print('\n\n')

# Now test db_replicator performance
print("running db_replicator")
t1 = time.time()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

# Make sure the database and table exist before querying
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 2, retry_interval=0.5, max_wait_time=1000)
t2 = time.time()

db_replicator_runner.stop()

time_delta = t2 - t1
rps = num_records / time_delta

print('\n\n')
print("*****************************")
print("DB Replicator Performance:")
print("records per second:", int(rps))
print("total time (seconds):", round(time_delta, 2))
print("*****************************")
print('\n\n')


def test_alter_tokens_split():
examples = [
# basic examples from the prompt:
Expand Down Expand Up @@ -719,138 +559,6 @@ def test_parse_db_name_from_query(query, expected):
assert BinlogReplicator._try_parse_db_name_from_query(query) == expected


@pytest.mark.optional
def test_performance_initial_only_replication():
config_file = 'tests/tests_config_perf.yaml'
num_records = 300000

cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(2048),
age int,
PRIMARY KEY (id)
);
''')

print("populating mysql data")

base_value = 'a' * 2000

for i in range(num_records):
if i % 2000 == 0:
print(f'populated {i} elements')
mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
)

mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)
print(f"finished populating {num_records} records")

# Now test db_replicator performance in initial_only mode
print("running db_replicator in initial_only mode")
t1 = time.time()

db_replicator_runner = DbReplicatorRunner(
TEST_DB_NAME,
additional_arguments='--initial_only=True',
cfg_file=config_file
)
db_replicator_runner.run()
db_replicator_runner.wait_complete() # Wait for the process to complete

# Make sure the database and table exist
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)

# Check that all records were replicated
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 1, retry_interval=0.5, max_wait_time=300)

t2 = time.time()

time_delta = t2 - t1
rps = num_records / time_delta

print('\n\n')
print("*****************************")
print("DB Replicator Initial Only Mode Performance:")
print("records per second:", int(rps))
print("total time (seconds):", round(time_delta, 2))
print("*****************************")
print('\n\n')

# Clean up
ch.drop_database(TEST_DB_NAME)

# Now test with parallel replication
# Set initial_replication_threads in the config
print("running db_replicator with parallel initial replication")

t1 = time.time()

# Create a custom config file for testing with parallel replication
parallel_config_file = 'tests/tests_config_perf_parallel.yaml'
if os.path.exists(parallel_config_file):
os.remove(parallel_config_file)

with open(config_file, 'r') as src_file:
config_content = src_file.read()
config_content += f"\ninitial_replication_threads: 8\n"
with open(parallel_config_file, 'w') as dest_file:
dest_file.write(config_content)

# Use the DbReplicator directly to test the new parallel implementation
db_replicator_runner = DbReplicatorRunner(
TEST_DB_NAME,
cfg_file=parallel_config_file
)
db_replicator_runner.run()

# Make sure the database and table exist
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)

# Check that all records were replicated
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 1, retry_interval=0.5, max_wait_time=300)

t2 = time.time()

time_delta = t2 - t1
rps = num_records / time_delta

print('\n\n')
print("*****************************")
print("DB Replicator Parallel Mode Performance:")
print("workers:", cfg.initial_replication_threads)
print("records per second:", int(rps))
print("total time (seconds):", round(time_delta, 2))
print("*****************************")
print('\n\n')

db_replicator_runner.stop()

# Clean up the temporary config file
os.remove(parallel_config_file)


def test_ignore_deletes():
# Create a temporary config file with ignore_deletes=True
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file:
Expand Down
Loading