Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import clickhouse_connect

from logging import getLogger
from dataclasses import dataclass, field
from collections import defaultdict

from .config import ClickhouseSettings
from .table_structure import TableStructure, TableField
Expand All @@ -28,6 +30,51 @@
'''


@dataclass
class SingleStats:
duration: float = 0.0
events: int = 0
records: int = 0

def to_dict(self):
return self.__dict__


@dataclass
class InsertEraseStats:
inserts: SingleStats = field(default_factory=SingleStats)
erases: SingleStats = field(default_factory=SingleStats)

def to_dict(self):
return {
'inserts': self.inserts.to_dict(),
'erases': self.erases.to_dict(),
}


@dataclass
class GeneralStats:
general: InsertEraseStats = field(default_factory=InsertEraseStats)
table_stats: dict[str, InsertEraseStats] = field(default_factory=lambda: defaultdict(InsertEraseStats))

def on_event(self, table_name: str, is_insert: bool, duration: float, records: int):
targets = []
if is_insert:
targets.append(self.general.inserts)
targets.append(self.table_stats[table_name].inserts)

for target in targets:
target.duration += duration
target.events += 1
target.records += records

def to_dict(self):
results = {'total': self.general.to_dict()}
for table_name, table_stats in self.table_stats.items():
results[table_name] = table_stats.to_dict()
return results


class ClickhouseApi:
MAX_RETRIES = 5
RETRY_INTERVAL = 30
Expand All @@ -44,8 +91,14 @@ def __init__(self, database: str | None, clickhouse_settings: ClickhouseSettings
send_receive_timeout=clickhouse_settings.send_receive_timeout,
)
self.tables_last_record_version = {} # table_name => last used row version
self.stats = GeneralStats()
self.execute_command('SET final = 1;')

def get_stats(self):
stats = self.stats.to_dict()
self.stats = GeneralStats()
return stats

def get_tables(self):
result = self.client.query('SHOW TABLES')
tables = result.result_rows
Expand Down Expand Up @@ -160,16 +213,27 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
if '.' not in full_table_name:
full_table_name = f'{self.database}.{table_name}'

duration = 0.0
for attempt in range(ClickhouseApi.MAX_RETRIES):
try:
t1 = time.time()
self.client.insert(table=full_table_name, data=records_to_insert)
t2 = time.time()
duration += (t2 - t1)
break
except clickhouse_connect.driver.exceptions.OperationalError as e:
logger.error(f'error inserting data: {e}', exc_info=e)
if attempt == ClickhouseApi.MAX_RETRIES - 1:
raise e
time.sleep(ClickhouseApi.RETRY_INTERVAL)

self.stats.on_event(
table_name=table_name,
duration=duration,
is_insert=True,
records=len(records_to_insert),
)

self.set_last_used_version(table_name, current_version)

def erase(self, table_name, field_name, field_values):
Expand All @@ -181,7 +245,16 @@ def erase(self, table_name, field_name, field_values):
'field_name': field_name,
'field_values': field_values,
})
t1 = time.time()
self.execute_command(query)
t2 = time.time()
duration = t2 - t1
self.stats.on_event(
table_name=table_name,
duration=duration,
is_insert=True,
records=len(field_values),
)

def drop_database(self, db_name):
self.execute_command(f'DROP DATABASE IF EXISTS {db_name}')
Expand Down
7 changes: 3 additions & 4 deletions mysql_ch_replicator/db_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .config import Settings
from .mysql_api import MySQLApi
from .clickhouse_api import ClickhouseApi
from .utils import GracefulKiller
from .utils import RegularKiller


logger = getLogger(__name__)
Expand Down Expand Up @@ -94,9 +94,9 @@ def optimize_database(self, db_name):

def run(self):
logger.info('running optimizer')
killer = GracefulKiller()
RegularKiller('optimizer')
try:
while not killer.kill_now:
while True:
db_to_optimize = self.select_db_to_optimize()
self.mysql_api.close()
if db_to_optimize is None:
Expand All @@ -105,4 +105,3 @@ def run(self):
self.optimize_database(db_name=db_to_optimize)
except Exception as e:
logger.error(f'error {e}', exc_info=True)
logger.info('optimizer stopped')
5 changes: 3 additions & 2 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .converter import MysqlToClickhouseConverter, strip_sql_name, strip_sql_comments
from .table_structure import TableStructure, TableField
from .binlog_replicator import DataReader, LogEvent, EventType
from .utils import GracefulKiller, touch_all_files
from .utils import GracefulKiller, touch_all_files, format_floats


logger = getLogger(__name__)
Expand Down Expand Up @@ -526,7 +526,8 @@ def log_stats_if_required(self):

self.last_dump_stats_time = curr_time
self.last_dump_stats_process_time = curr_process_time
logger.info(f'stats: {json.dumps(self.stats.__dict__)}')
logger.info(f'stats: {json.dumps(format_floats(self.stats.__dict__))}')
logger.info(f'ch_stats: {json.dumps(format_floats(self.clickhouse_api.get_stats()))}')
self.stats = Statistics()

def upload_records_if_required(self, table_name):
Expand Down
22 changes: 22 additions & 0 deletions mysql_ch_replicator/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import signal
import subprocess
import os
import sys
import time

from pathlib import Path
Expand All @@ -19,6 +20,17 @@ def exit_gracefully(self, signum, frame):
self.kill_now = True


class RegularKiller:
def __init__(self, proc_name):
self.proc_name = proc_name
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

def exit_gracefully(self, signum, frame):
logger.info(f'{self.proc_name} stopped')
sys.exit(0)


class ProcessRunner:
def __init__(self, cmd):
self.cmd = cmd
Expand Down Expand Up @@ -68,3 +80,13 @@ def touch_all_files(directory_path):
os.utime(item, times=(current_time, current_time))
except Exception as e:
logger.warning(f"Failed to touch {item}: {e}")


def format_floats(data):
if isinstance(data, dict):
return {k: format_floats(v) for k, v in data.items()}
elif isinstance(data, list):
return [format_floats(v) for v in data]
elif isinstance(data, float):
return round(data, 3)
return data
Loading