Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def validate(self):
class Settings:
DEFAULT_LOG_LEVEL = 'info'
DEFAULT_OPTIMIZE_INTERVAL = 86400
DEFAULT_CHECK_DB_UPDATED_INTERVAL = 120

def __init__(self):
self.mysql = MysqlSettings()
Expand All @@ -96,6 +97,7 @@ def __init__(self):
self.log_level = 'info'
self.debug_log_level = False
self.optimize_interval = 0
self.check_db_updated_interval = 0

def load(self, settings_file):
data = open(settings_file, 'r').read()
Expand All @@ -110,6 +112,9 @@ def load(self, settings_file):
self.exclude_tables = data.pop('exclude_tables', '')
self.log_level = data.pop('log_level', Settings.DEFAULT_LOG_LEVEL)
self.optimize_interval = data.pop('optimize_interval', Settings.DEFAULT_OPTIMIZE_INTERVAL)
self.check_db_updated_interval = data.pop(
'check_db_updated_interval', Settings.DEFAULT_CHECK_DB_UPDATED_INTERVAL,
)
assert isinstance(self.databases, str) or isinstance(self.databases, list)
assert isinstance(self.tables, str) or isinstance(self.tables, list)
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))
Expand Down
6 changes: 3 additions & 3 deletions mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def __init__(self, database: str, mysql_settings: MysqlSettings):
def close(self):
self.db.close()

def reconnect_if_required(self):
def reconnect_if_required(self, force=False):
curr_time = time.time()
if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL:
if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL and not force:
return
conn_settings = dict(
host=self.mysql_settings.host,
Expand Down Expand Up @@ -59,7 +59,7 @@ def set_database(self, database):
self.cursor.execute(f'USE {self.database}')

def get_databases(self):
self.reconnect_if_required()
self.reconnect_if_required(True) # New database appear only after new connection
self.cursor.execute('SHOW DATABASES')
res = self.cursor.fetchall()
tables = [x[0] for x in res]
Expand Down
26 changes: 25 additions & 1 deletion mysql_ch_replicator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases:
self.config = config
self.databases = databases or config.databases
self.wait_initial_replication = wait_initial_replication
self.runners: dict = {}
self.runners: dict[str: DbReplicatorRunner] = {}
self.binlog_runner = None
self.db_optimizer = None

Expand All @@ -61,6 +61,26 @@ def restart_dead_processes(self):
if self.db_optimizer is not None:
self.db_optimizer.restart_dead_process_if_required()

def check_databases_updated(self, mysql_api: MySQLApi):
logger.debug('check if databases were created / removed in mysql')
databases = mysql_api.get_databases()
logger.info(f'mysql databases: {databases}')
databases = [db for db in databases if self.config.is_database_matches(db)]
logger.info(f'mysql databases filtered: {databases}')
for db in databases:
if db in self.runners:
continue
logger.info(f'running replication for {db} (database created in mysql)')
runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file)
runner.run()

for db in self.runners.keys():
if db in databases:
continue
logger.info(f'stop replication for {db} (database removed from mysql)')
self.runners[db].stop()
self.runners.pop(db)

def run(self):
mysql_api = MySQLApi(
database=None, mysql_settings=self.config.mysql,
Expand Down Expand Up @@ -101,9 +121,13 @@ def run(self):

logger.info('all replicators launched')

last_check_db_updated = time.time()
while not killer.kill_now:
time.sleep(1)
self.restart_dead_processes()
if time.time() - last_check_db_updated > self.config.check_db_updated_interval:
self.check_databases_updated(mysql_api=mysql_api)
last_check_db_updated = time.time()

logger.info('stopping runner')

Expand Down
7 changes: 7 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
CONFIG_FILE = 'tests_config.yaml'
CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml'
TEST_DB_NAME = 'replication_test_db'
TEST_DB_NAME_2 = 'replication_test_db_2'
TEST_TABLE_NAME = 'test_table'
TEST_TABLE_NAME_2 = 'test_table_2'
TEST_TABLE_NAME_3 = 'test_table_3'
Expand Down Expand Up @@ -300,6 +301,9 @@ def test_runner():
clickhouse_settings=cfg.clickhouse,
)

mysql.drop_database(TEST_DB_NAME_2)
ch.drop_database(TEST_DB_NAME_2)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
Expand Down Expand Up @@ -358,6 +362,9 @@ def test_runner():

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)

mysql.create_database(TEST_DB_NAME_2)
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases(), max_wait_time=5)

run_all_runner.stop()


Expand Down
1 change: 1 addition & 0 deletions tests_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ binlog_replicator:
databases: '*test*'
log_level: 'debug'
optimize_interval: 3
check_db_updated_interval: 3
Loading