diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index b5c54f4..2c31946 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -83,6 +83,7 @@ def validate(self): class Settings: DEFAULT_LOG_LEVEL = 'info' DEFAULT_OPTIMIZE_INTERVAL = 86400 + DEFAULT_CHECK_DB_UPDATED_INTERVAL = 120 def __init__(self): self.mysql = MysqlSettings() @@ -96,6 +97,7 @@ def __init__(self): self.log_level = 'info' self.debug_log_level = False self.optimize_interval = 0 + self.check_db_updated_interval = 0 def load(self, settings_file): data = open(settings_file, 'r').read() @@ -110,6 +112,9 @@ def load(self, settings_file): self.exclude_tables = data.pop('exclude_tables', '') self.log_level = data.pop('log_level', Settings.DEFAULT_LOG_LEVEL) self.optimize_interval = data.pop('optimize_interval', Settings.DEFAULT_OPTIMIZE_INTERVAL) + self.check_db_updated_interval = data.pop( + 'check_db_updated_interval', Settings.DEFAULT_CHECK_DB_UPDATED_INTERVAL, + ) assert isinstance(self.databases, str) or isinstance(self.databases, list) assert isinstance(self.tables, str) or isinstance(self.tables, list) self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator')) diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index f18fe2d..ee34b7c 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -17,9 +17,9 @@ def __init__(self, database: str, mysql_settings: MysqlSettings): def close(self): self.db.close() - def reconnect_if_required(self): + def reconnect_if_required(self, force=False): curr_time = time.time() - if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL: + if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL and not force: return conn_settings = dict( host=self.mysql_settings.host, @@ -59,7 +59,7 @@ def set_database(self, database): self.cursor.execute(f'USE {self.database}') def get_databases(self): - self.reconnect_if_required() + self.reconnect_if_required(True) # New database appear only after new connection self.cursor.execute('SHOW DATABASES') res = self.cursor.fetchall() tables = [x[0] for x in res] diff --git a/mysql_ch_replicator/runner.py b/mysql_ch_replicator/runner.py index 479dfb4..60d3c09 100644 --- a/mysql_ch_replicator/runner.py +++ b/mysql_ch_replicator/runner.py @@ -40,7 +40,7 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases: self.config = config self.databases = databases or config.databases self.wait_initial_replication = wait_initial_replication - self.runners: dict = {} + self.runners: dict[str: DbReplicatorRunner] = {} self.binlog_runner = None self.db_optimizer = None @@ -61,6 +61,26 @@ def restart_dead_processes(self): if self.db_optimizer is not None: self.db_optimizer.restart_dead_process_if_required() + def check_databases_updated(self, mysql_api: MySQLApi): + logger.debug('check if databases were created / removed in mysql') + databases = mysql_api.get_databases() + logger.info(f'mysql databases: {databases}') + databases = [db for db in databases if self.config.is_database_matches(db)] + logger.info(f'mysql databases filtered: {databases}') + for db in databases: + if db in self.runners: + continue + logger.info(f'running replication for {db} (database created in mysql)') + runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file) + runner.run() + + for db in self.runners.keys(): + if db in databases: + continue + logger.info(f'stop replication for {db} (database removed from mysql)') + self.runners[db].stop() + self.runners.pop(db) + def run(self): mysql_api = MySQLApi( database=None, mysql_settings=self.config.mysql, @@ -101,9 +121,13 @@ def run(self): logger.info('all replicators launched') + last_check_db_updated = time.time() while not killer.kill_now: time.sleep(1) self.restart_dead_processes() + if time.time() - last_check_db_updated > self.config.check_db_updated_interval: + self.check_databases_updated(mysql_api=mysql_api) + last_check_db_updated = time.time() logger.info('stopping runner') diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 5ed455d..6027257 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -17,6 +17,7 @@ CONFIG_FILE = 'tests_config.yaml' CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml' TEST_DB_NAME = 'replication_test_db' +TEST_DB_NAME_2 = 'replication_test_db_2' TEST_TABLE_NAME = 'test_table' TEST_TABLE_NAME_2 = 'test_table_2' TEST_TABLE_NAME_3 = 'test_table_3' @@ -300,6 +301,9 @@ def test_runner(): clickhouse_settings=cfg.clickhouse, ) + mysql.drop_database(TEST_DB_NAME_2) + ch.drop_database(TEST_DB_NAME_2) + prepare_env(cfg, mysql, ch) mysql.execute(f''' @@ -358,6 +362,9 @@ def test_runner(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4) + mysql.create_database(TEST_DB_NAME_2) + assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases(), max_wait_time=5) + run_all_runner.stop() diff --git a/tests_config.yaml b/tests_config.yaml index e095d8d..196bf79 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -18,3 +18,4 @@ binlog_replicator: databases: '*test*' log_level: 'debug' optimize_interval: 3 +check_db_updated_interval: 3