Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ def __init__(self):
self.check_db_updated_interval = 0
self.indexes: list[Index] = []
self.auto_restart_interval = 0
self.http_host = ''
self.http_port = 0

def load(self, settings_file):
data = open(settings_file, 'r').read()
Expand All @@ -128,6 +130,9 @@ def load(self, settings_file):
self.auto_restart_interval = data.pop(
'auto_restart_interval', Settings.DEFAULT_AUTO_RESTART_INTERVAL,
)
self.http_host = data.pop('http_host', '')
self.http_port = data.pop('http_port', 0)

indexes = data.pop('indexes', [])
for index in indexes:
self.indexes.append(
Expand Down
59 changes: 59 additions & 0 deletions mysql_ch_replicator/runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
import time
import sys
import threading
from uvicorn import Config, Server
from fastapi import APIRouter, FastAPI

from logging import getLogger

Expand Down Expand Up @@ -35,6 +38,10 @@ def __init__(self, db_name, config_file):
super().__init__(f'{sys.argv[0]} --config {config_file} run_all --db {db_name}')


app = FastAPI()



class Runner:
def __init__(self, config: Settings, wait_initial_replication: bool, databases: str):
self.config = config
Expand All @@ -43,6 +50,32 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases:
self.runners: dict[str: DbReplicatorRunner] = {}
self.binlog_runner = None
self.db_optimizer = None
self.http_server = None
self.router = None
self.need_restart_replication = False
self.replication_restarted = False

def run_server(self):
if not self.config.http_host or not self.config.http_port:
logger.info('http server disabled')
return
logger.info('starting http server')

config = Config(app=app, host=self.config.http_host, port=self.config.http_port)
self.router = APIRouter()
self.router.add_api_route("/restart_replication", self.restart_replication, methods=["GET"])
app.include_router(self.router)

self.http_server = Server(config)
self.http_server.run()

def restart_replication(self):
self.replication_restarted = False
self.need_restart_replication = True
while not self.replication_restarted:
logger.info('waiting replication restarted..')
time.sleep(1)
return {"restarted": True}

def is_initial_replication_finished(self, db_name):
state_path = os.path.join(
Expand All @@ -61,6 +94,23 @@ def restart_dead_processes(self):
if self.db_optimizer is not None:
self.db_optimizer.restart_dead_process_if_required()

def restart_replication_if_required(self):
if not self.need_restart_replication:
return
logger.info('\n\n\n ====== restarting replication =====')
for db_name, runner in self.runners.items():
logger.info(f'stopping runner {db_name}')
runner.stop()
path = os.path.join(self.config.binlog_replicator.data_dir, db_name, 'state.pckl')
if os.path.exists(path):
logger.debug(f'removing {path}')
os.remove(path)

logger.info('starting replication')
self.restart_dead_processes()
self.need_restart_replication = False
self.replication_restarted = True

def check_databases_updated(self, mysql_api: MySQLApi):
logger.debug('check if databases were created / removed in mysql')
databases = mysql_api.get_databases()
Expand Down Expand Up @@ -96,6 +146,9 @@ def run(self):
self.db_optimizer = DbOptimizerRunner(self.config.settings_file)
self.db_optimizer.run()

server_thread = threading.Thread(target=self.run_server, daemon=True)
server_thread.start()

# First - continue replication for DBs that already finished initial replication
for db in databases:
if not self.is_initial_replication_finished(db_name=db):
Expand Down Expand Up @@ -124,6 +177,7 @@ def run(self):
last_check_db_updated = time.time()
while not killer.kill_now:
time.sleep(1)
self.restart_replication_if_required()
self.restart_dead_processes()
if time.time() - last_check_db_updated > self.config.check_db_updated_interval:
self.check_databases_updated(mysql_api=mysql_api)
Expand All @@ -143,4 +197,9 @@ def run(self):
logger.info(f'stopping replication for {db_name}')
db_replication_runner.stop()

if self.http_server:
self.http_server.should_exit = True

server_thread.join()

logger.info('stopped')
4 changes: 4 additions & 0 deletions mysql_ch_replicator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def run(self):
self.process = subprocess.Popen(cmd)

def restart_dead_process_if_required(self):
if self.process is None:
logger.warning(f'Restarting stopped process: < {self.cmd} >')
self.run()
return
res = self.process.poll()
if res is None:
# still running
Expand Down
Loading
Loading