diff --git a/README.md b/README.md index 22e62e0..4ec6436 100644 --- a/README.md +++ b/README.md @@ -137,7 +137,8 @@ tables: '*' exclude_databases: ['database_10', 'database_*_42'] # optional exclude_tables: ['meta_table_*'] # optional -log_level: 'info' # optional +log_level: 'info' # optional +optimize_interval: 86400 # optional ``` #### Required settings @@ -152,6 +153,7 @@ log_level: 'info' # optional - `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority. - `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority. - `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`) +- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance. Few more tables / dbs examples: diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 0622ad5..3e5eac1 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -32,7 +32,8 @@ class ClickhouseApi: MAX_RETRIES = 5 RETRY_INTERVAL = 30 - def __init__(self, database: str, clickhouse_settings: ClickhouseSettings): + + def __init__(self, database: str | None, clickhouse_settings: ClickhouseSettings): self.database = database self.clickhouse_settings = clickhouse_settings self.client = clickhouse_connect.get_client( @@ -175,10 +176,12 @@ def drop_database(self, db_name): def create_database(self, db_name): self.cursor.execute(f'CREATE DATABASE {db_name}') - def select(self, table_name, where=None): + def select(self, table_name, where=None, final=None): query = f'SELECT * FROM {table_name}' if where: query += f' WHERE {where}' + if final is not None: + query += f' SETTINGS final = {int(final)};' result = self.client.query(query) rows = result.result_rows columns = result.column_names diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index b9767bb..b5c54f4 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -81,6 +81,8 @@ def validate(self): class Settings: + DEFAULT_LOG_LEVEL = 'info' + DEFAULT_OPTIMIZE_INTERVAL = 86400 def __init__(self): self.mysql = MysqlSettings() @@ -93,6 +95,7 @@ def __init__(self): self.settings_file = '' self.log_level = 'info' self.debug_log_level = False + self.optimize_interval = 0 def load(self, settings_file): data = open(settings_file, 'r').read() @@ -105,7 +108,8 @@ def load(self, settings_file): self.tables = data.pop('tables', '*') self.exclude_databases = data.pop('exclude_databases', '') self.exclude_tables = data.pop('exclude_tables', '') - self.log_level = data.pop('log_level', 'info') + self.log_level = data.pop('log_level', Settings.DEFAULT_LOG_LEVEL) + self.optimize_interval = data.pop('optimize_interval', Settings.DEFAULT_OPTIMIZE_INTERVAL) assert isinstance(self.databases, str) or isinstance(self.databases, list) assert isinstance(self.tables, str) or isinstance(self.tables, list) self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator')) diff --git a/mysql_ch_replicator/db_optimizer.py b/mysql_ch_replicator/db_optimizer.py new file mode 100644 index 0000000..3f0c70a --- /dev/null +++ b/mysql_ch_replicator/db_optimizer.py @@ -0,0 +1,104 @@ +import pickle +import os +import time +from logging import getLogger + +from .config import Settings +from .mysql_api import MySQLApi +from .clickhouse_api import ClickhouseApi +from .utils import GracefulKiller + + +logger = getLogger(__name__) + + +class State: + + def __init__(self, file_name): + self.file_name = file_name + self.last_process_time = {} + self.load() + + def load(self): + file_name = self.file_name + if not os.path.exists(file_name): + return + data = open(file_name, 'rb').read() + data = pickle.loads(data) + self.last_process_time = data['last_process_time'] + + def save(self): + file_name = self.file_name + data = pickle.dumps({ + 'last_process_time': self.last_process_time, + }) + with open(file_name + '.tmp', 'wb') as f: + f.write(data) + os.rename(file_name + '.tmp', file_name) + + +class DbOptimizer: + def __init__(self, config: Settings): + self.state = State(os.path.join( + config.binlog_replicator.data_dir, + 'db_optimizer.bin', + )) + self.config = config + self.mysql_api = MySQLApi( + database=None, + mysql_settings=config.mysql, + ) + self.clickhouse_api = ClickhouseApi( + database=None, + clickhouse_settings=config.clickhouse, + ) + + def select_db_to_optimize(self): + databases = self.mysql_api.get_databases() + databases = [db for db in databases if self.config.is_database_matches(db)] + ch_databases = set(self.clickhouse_api.get_databases()) + + for db in databases: + if db not in ch_databases: + continue + last_process_time = self.state.last_process_time.get(db, 0.0) + if time.time() - last_process_time < self.config.optimize_interval: + continue + return db + return None + + def optimize_table(self, db_name, table_name): + logger.info(f'Optimizing table {db_name}.{table_name}') + self.clickhouse_api.execute_command( + f'OPTIMIZE TABLE {db_name}.{table_name} FINAL SETTINGS mutations_sync = 2' + ) + logger.info('Optimize finished') + self.state.last_process_time[db_name] = time.time() + + def optimize_database(self, db_name): + self.mysql_api.set_database(db_name) + tables = self.mysql_api.get_tables() + tables = [table for table in tables if self.config.is_table_matches(table)] + + self.clickhouse_api.execute_command(f'USE {db_name}') + ch_tables = set(self.clickhouse_api.get_tables()) + + for table in tables: + if table not in ch_tables: + continue + self.optimize_table(db_name, table) + self.state.save() + + def run(self): + logger.info('running optimizer') + killer = GracefulKiller() + try: + while not killer.kill_now: + db_to_optimize = self.select_db_to_optimize() + if db_to_optimize is None: + time.sleep(min(120, self.config.optimize_interval)) + continue + self.optimize_database(db_name=db_to_optimize) + except Exception as e: + logger.error(f'error {e}', exc_info=True) + logger.info('optimizer stopped') diff --git a/mysql_ch_replicator/main.py b/mysql_ch_replicator/main.py index 966dd27..27c9031 100755 --- a/mysql_ch_replicator/main.py +++ b/mysql_ch_replicator/main.py @@ -9,6 +9,7 @@ from .config import Settings from .db_replicator import DbReplicator from .binlog_replicator import BinlogReplicator +from .db_optimizer import DbOptimizer from .monitoring import Monitoring from .runner import Runner @@ -97,6 +98,24 @@ def run_db_replicator(args, config: Settings): db_replicator.run() +def run_db_optimizer(args, config: Settings): + data_dir = config.binlog_replicator.data_dir + if not os.path.exists(data_dir): + os.mkdir(data_dir) + + log_file = os.path.join( + data_dir, + 'db_optimizer.log', + ) + + set_logging_config(f'dbopt {args.db}', log_file=log_file, log_level_str=config.log_level) + + db_optimizer = DbOptimizer( + config=config, + ) + db_optimizer.run() + + def run_monitoring(args, config: Settings): set_logging_config('monitor', log_level_str=config.log_level) monitoring = Monitoring(args.db or '', config) @@ -114,7 +133,7 @@ def main(): parser.add_argument( "mode", help="run mode", type=str, - choices=["run_all", "binlog_replicator", "db_replicator", "monitoring"]) + choices=["run_all", "binlog_replicator", "db_replicator", "monitoring", "db_optimizer"]) parser.add_argument("--config", help="config file path", default='config.yaml', type=str) parser.add_argument("--db", help="source database(s) name", type=str) parser.add_argument("--target_db", help="target database(s) name, if not set will be same as source", type=str) @@ -131,6 +150,8 @@ def main(): run_binlog_replicator(args, config) if args.mode == 'db_replicator': run_db_replicator(args, config) + if args.mode == 'db_optimizer': + run_db_optimizer(args, config) if args.mode == 'monitoring': run_monitoring(args, config) if args.mode == 'run_all': diff --git a/mysql_ch_replicator/runner.py b/mysql_ch_replicator/runner.py index e1f7085..1a64465 100644 --- a/mysql_ch_replicator/runner.py +++ b/mysql_ch_replicator/runner.py @@ -25,6 +25,11 @@ def __init__(self, db_name, config_file): super().__init__(f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator') +class DbOptimizerRunner(ProcessRunner): + def __init__(self, config_file): + super().__init__(f'{sys.argv[0]} --config {config_file} db_optimizer') + + class RunAllRunner(ProcessRunner): def __init__(self, db_name, config_file): super().__init__(f'{sys.argv[0]} --config {config_file} run_all --db {db_name}') @@ -37,6 +42,7 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases: self.wait_initial_replication = wait_initial_replication self.runners: dict = {} self.binlog_runner = None + self.db_optimizer = None def is_initial_replication_finished(self, db_name): state_path = os.path.join( @@ -65,6 +71,9 @@ def run(self): self.binlog_runner = BinlogReplicatorRunner(self.config.settings_file) self.binlog_runner.run() + self.db_optimizer = DbOptimizerRunner(self.config.settings_file) + self.db_optimizer.run() + # First - continue replication for DBs that already finished initial replication for db in databases: if not self.is_initial_replication_finished(db_name=db): @@ -100,6 +109,10 @@ def run(self): logger.info('stopping binlog replication') self.binlog_runner.stop() + if self.db_optimizer is not None: + logger.info('stopping db_optimizer') + self.db_optimizer.stop() + for db_name, db_replication_runner in self.runners.items(): logger.info(f'stopping replication for {db_name}') db_replication_runner.stop() diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index d3cb13f..5ed455d 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -344,8 +344,19 @@ def test_runner(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=66 WHERE name='Ivan'", commit=True) - time.sleep(4) - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 66) + + mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=77 WHERE name='Ivan'", commit=True) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 77) + + mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=88 WHERE name='Ivan'", commit=True) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 88) + + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Vlad', 99);", commit=True) + + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) + + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4) run_all_runner.stop() diff --git a/tests_config.yaml b/tests_config.yaml index 8a722fb..e095d8d 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -17,3 +17,4 @@ binlog_replicator: databases: '*test*' log_level: 'debug' +optimize_interval: 3