Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ def save(self):
f.write(data)
os.rename(file_name + '.tmp', file_name)

def remove(self):
file_name = self.file_name
if os.path.exists(file_name):
os.remove(file_name)
if os.path.exists(file_name + '.tmp'):
os.remove(file_name + '.tmp')


@dataclass
class Statistics:
Expand Down Expand Up @@ -115,7 +122,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
)
self.converter = MysqlToClickhouseConverter(self)
self.data_reader = DataReader(config.binlog_replicator, database)
self.state = State(os.path.join(config.binlog_replicator.data_dir, database, 'state.pckl'))
self.state = self.create_state()
self.clickhouse_api.tables_last_record_version = self.state.tables_last_record_version
self.last_save_state_time = 0
self.stats = Statistics()
Expand All @@ -126,9 +133,22 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
self.last_records_upload_time = 0
self.last_touch_time = 0

def create_state(self):
return State(os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl'))

def run(self):
try:
logger.info('launched db_replicator')

if self.state.status != Status.NONE:
# ensure target database still exists
if self.target_database not in self.clickhouse_api.get_databases():
logger.warning(f'database {self.target_database} missing in CH')
if self.initial_only:
logger.warning('will run replication from scratch')
self.state.remove()
self.state = self.create_state()

if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
self.run_realtime_replication()
return
Expand Down Expand Up @@ -213,6 +233,7 @@ def perform_initial_replication_table(self, table_name):

if not self.config.is_table_matches(table_name):
logger.info(f'skip table {table_name} - not matching any allowed table')
return

max_primary_key = None
if self.state.initial_replication_table == table_name:
Expand Down Expand Up @@ -280,6 +301,7 @@ def perform_initial_replication_table(self, table_name):
def run_realtime_replication(self):
if self.initial_only:
logger.info('skip running realtime replication, only initial replication was requested')
self.state.remove()
return

self.mysql_api.close()
Expand Down
7 changes: 7 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,13 @@ def test_initial_only():
assert TEST_TABLE_NAME in ch.get_tables()
assert len(ch.select(TEST_TABLE_NAME)) == 2

ch.execute_command(f'DROP DATABASE {TEST_DB_NAME}')

db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, additional_arguments='--initial_only=True')
db_replicator_runner.run()
db_replicator_runner.wait_complete()
assert TEST_DB_NAME in ch.get_databases()


def test_database_tables_filtering():
cfg = config.Settings()
Expand Down
Loading