From dc1a45c8a50c6388f0f24f6536c34ee059bd8ad4 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Mon, 23 Dec 2024 02:56:28 +0400 Subject: [PATCH] Better insert statistics --- mysql_ch_replicator/clickhouse_api.py | 73 +++++++++++++++++++++++++++ mysql_ch_replicator/db_optimizer.py | 7 ++- mysql_ch_replicator/db_replicator.py | 5 +- mysql_ch_replicator/utils.py | 22 ++++++++ 4 files changed, 101 insertions(+), 6 deletions(-) diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 9f7c475..8426093 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -3,6 +3,8 @@ import clickhouse_connect from logging import getLogger +from dataclasses import dataclass, field +from collections import defaultdict from .config import ClickhouseSettings from .table_structure import TableStructure, TableField @@ -28,6 +30,51 @@ ''' +@dataclass +class SingleStats: + duration: float = 0.0 + events: int = 0 + records: int = 0 + + def to_dict(self): + return self.__dict__ + + +@dataclass +class InsertEraseStats: + inserts: SingleStats = field(default_factory=SingleStats) + erases: SingleStats = field(default_factory=SingleStats) + + def to_dict(self): + return { + 'inserts': self.inserts.to_dict(), + 'erases': self.erases.to_dict(), + } + + +@dataclass +class GeneralStats: + general: InsertEraseStats = field(default_factory=InsertEraseStats) + table_stats: dict[str, InsertEraseStats] = field(default_factory=lambda: defaultdict(InsertEraseStats)) + + def on_event(self, table_name: str, is_insert: bool, duration: float, records: int): + targets = [] + if is_insert: + targets.append(self.general.inserts) + targets.append(self.table_stats[table_name].inserts) + + for target in targets: + target.duration += duration + target.events += 1 + target.records += records + + def to_dict(self): + results = {'total': self.general.to_dict()} + for table_name, table_stats in self.table_stats.items(): + results[table_name] = table_stats.to_dict() + return results + + class ClickhouseApi: MAX_RETRIES = 5 RETRY_INTERVAL = 30 @@ -44,8 +91,14 @@ def __init__(self, database: str | None, clickhouse_settings: ClickhouseSettings send_receive_timeout=clickhouse_settings.send_receive_timeout, ) self.tables_last_record_version = {} # table_name => last used row version + self.stats = GeneralStats() self.execute_command('SET final = 1;') + def get_stats(self): + stats = self.stats.to_dict() + self.stats = GeneralStats() + return stats + def get_tables(self): result = self.client.query('SHOW TABLES') tables = result.result_rows @@ -160,9 +213,13 @@ def insert(self, table_name, records, table_structure: TableStructure = None): if '.' not in full_table_name: full_table_name = f'{self.database}.{table_name}' + duration = 0.0 for attempt in range(ClickhouseApi.MAX_RETRIES): try: + t1 = time.time() self.client.insert(table=full_table_name, data=records_to_insert) + t2 = time.time() + duration += (t2 - t1) break except clickhouse_connect.driver.exceptions.OperationalError as e: logger.error(f'error inserting data: {e}', exc_info=e) @@ -170,6 +227,13 @@ def insert(self, table_name, records, table_structure: TableStructure = None): raise e time.sleep(ClickhouseApi.RETRY_INTERVAL) + self.stats.on_event( + table_name=table_name, + duration=duration, + is_insert=True, + records=len(records_to_insert), + ) + self.set_last_used_version(table_name, current_version) def erase(self, table_name, field_name, field_values): @@ -181,7 +245,16 @@ def erase(self, table_name, field_name, field_values): 'field_name': field_name, 'field_values': field_values, }) + t1 = time.time() self.execute_command(query) + t2 = time.time() + duration = t2 - t1 + self.stats.on_event( + table_name=table_name, + duration=duration, + is_insert=True, + records=len(field_values), + ) def drop_database(self, db_name): self.execute_command(f'DROP DATABASE IF EXISTS {db_name}') diff --git a/mysql_ch_replicator/db_optimizer.py b/mysql_ch_replicator/db_optimizer.py index 80a4782..78de82f 100644 --- a/mysql_ch_replicator/db_optimizer.py +++ b/mysql_ch_replicator/db_optimizer.py @@ -6,7 +6,7 @@ from .config import Settings from .mysql_api import MySQLApi from .clickhouse_api import ClickhouseApi -from .utils import GracefulKiller +from .utils import RegularKiller logger = getLogger(__name__) @@ -94,9 +94,9 @@ def optimize_database(self, db_name): def run(self): logger.info('running optimizer') - killer = GracefulKiller() + RegularKiller('optimizer') try: - while not killer.kill_now: + while True: db_to_optimize = self.select_db_to_optimize() self.mysql_api.close() if db_to_optimize is None: @@ -105,4 +105,3 @@ def run(self): self.optimize_database(db_name=db_to_optimize) except Exception as e: logger.error(f'error {e}', exc_info=True) - logger.info('optimizer stopped') diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 81d065d..64c6754 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -13,7 +13,7 @@ from .converter import MysqlToClickhouseConverter, strip_sql_name, strip_sql_comments from .table_structure import TableStructure, TableField from .binlog_replicator import DataReader, LogEvent, EventType -from .utils import GracefulKiller, touch_all_files +from .utils import GracefulKiller, touch_all_files, format_floats logger = getLogger(__name__) @@ -526,7 +526,8 @@ def log_stats_if_required(self): self.last_dump_stats_time = curr_time self.last_dump_stats_process_time = curr_process_time - logger.info(f'stats: {json.dumps(self.stats.__dict__)}') + logger.info(f'stats: {json.dumps(format_floats(self.stats.__dict__))}') + logger.info(f'ch_stats: {json.dumps(format_floats(self.clickhouse_api.get_stats()))}') self.stats = Statistics() def upload_records_if_required(self, table_name): diff --git a/mysql_ch_replicator/utils.py b/mysql_ch_replicator/utils.py index 37433e7..2d90a77 100644 --- a/mysql_ch_replicator/utils.py +++ b/mysql_ch_replicator/utils.py @@ -1,6 +1,7 @@ import signal import subprocess import os +import sys import time from pathlib import Path @@ -19,6 +20,17 @@ def exit_gracefully(self, signum, frame): self.kill_now = True +class RegularKiller: + def __init__(self, proc_name): + self.proc_name = proc_name + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self, signum, frame): + logger.info(f'{self.proc_name} stopped') + sys.exit(0) + + class ProcessRunner: def __init__(self, cmd): self.cmd = cmd @@ -68,3 +80,13 @@ def touch_all_files(directory_path): os.utime(item, times=(current_time, current_time)) except Exception as e: logger.warning(f"Failed to touch {item}: {e}") + + +def format_floats(data): + if isinstance(data, dict): + return {k: format_floats(v) for k, v in data.items()} + elif isinstance(data, list): + return [format_floats(v) for v in data] + elif isinstance(data, float): + return round(data, 3) + return data