diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index b7936be..d2fa6df 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -73,6 +73,13 @@ def save(self): f.write(data) os.rename(file_name + '.tmp', file_name) + def remove(self): + file_name = self.file_name + if os.path.exists(file_name): + os.remove(file_name) + if os.path.exists(file_name + '.tmp'): + os.remove(file_name + '.tmp') + @dataclass class Statistics: @@ -115,7 +122,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None, ) self.converter = MysqlToClickhouseConverter(self) self.data_reader = DataReader(config.binlog_replicator, database) - self.state = State(os.path.join(config.binlog_replicator.data_dir, database, 'state.pckl')) + self.state = self.create_state() self.clickhouse_api.tables_last_record_version = self.state.tables_last_record_version self.last_save_state_time = 0 self.stats = Statistics() @@ -126,9 +133,22 @@ def __init__(self, config: Settings, database: str, target_database: str = None, self.last_records_upload_time = 0 self.last_touch_time = 0 + def create_state(self): + return State(os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl')) + def run(self): try: logger.info('launched db_replicator') + + if self.state.status != Status.NONE: + # ensure target database still exists + if self.target_database not in self.clickhouse_api.get_databases(): + logger.warning(f'database {self.target_database} missing in CH') + if self.initial_only: + logger.warning('will run replication from scratch') + self.state.remove() + self.state = self.create_state() + if self.state.status == Status.RUNNING_REALTIME_REPLICATION: self.run_realtime_replication() return @@ -213,6 +233,7 @@ def perform_initial_replication_table(self, table_name): if not self.config.is_table_matches(table_name): logger.info(f'skip table {table_name} - not matching any allowed table') + return max_primary_key = None if self.state.initial_replication_table == table_name: @@ -280,6 +301,7 @@ def perform_initial_replication_table(self, table_name): def run_realtime_replication(self): if self.initial_only: logger.info('skip running realtime replication, only initial replication was requested') + self.state.remove() return self.mysql_api.close() diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 87ba342..abb3d34 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -382,6 +382,13 @@ def test_initial_only(): assert TEST_TABLE_NAME in ch.get_tables() assert len(ch.select(TEST_TABLE_NAME)) == 2 + ch.execute_command(f'DROP DATABASE {TEST_DB_NAME}') + + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, additional_arguments='--initial_only=True') + db_replicator_runner.run() + db_replicator_runner.wait_complete() + assert TEST_DB_NAME in ch.get_databases() + def test_database_tables_filtering(): cfg = config.Settings()