Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ tables: '*'
exclude_databases: ['database_10', 'database_*_42'] # optional
exclude_tables: ['meta_table_*'] # optional

log_level: 'info' # optional
log_level: 'info' # optional
optimize_interval: 86400 # optional
```

#### Required settings
Expand All @@ -152,6 +153,7 @@ log_level: 'info' # optional
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.

Few more tables / dbs examples:

Expand Down
7 changes: 5 additions & 2 deletions mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
class ClickhouseApi:
MAX_RETRIES = 5
RETRY_INTERVAL = 30
def __init__(self, database: str, clickhouse_settings: ClickhouseSettings):

def __init__(self, database: str | None, clickhouse_settings: ClickhouseSettings):
self.database = database
self.clickhouse_settings = clickhouse_settings
self.client = clickhouse_connect.get_client(
Expand Down Expand Up @@ -175,10 +176,12 @@ def drop_database(self, db_name):
def create_database(self, db_name):
self.cursor.execute(f'CREATE DATABASE {db_name}')

def select(self, table_name, where=None):
def select(self, table_name, where=None, final=None):
query = f'SELECT * FROM {table_name}'
if where:
query += f' WHERE {where}'
if final is not None:
query += f' SETTINGS final = {int(final)};'
result = self.client.query(query)
rows = result.result_rows
columns = result.column_names
Expand Down
6 changes: 5 additions & 1 deletion mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def validate(self):


class Settings:
DEFAULT_LOG_LEVEL = 'info'
DEFAULT_OPTIMIZE_INTERVAL = 86400

def __init__(self):
self.mysql = MysqlSettings()
Expand All @@ -93,6 +95,7 @@ def __init__(self):
self.settings_file = ''
self.log_level = 'info'
self.debug_log_level = False
self.optimize_interval = 0

def load(self, settings_file):
data = open(settings_file, 'r').read()
Expand All @@ -105,7 +108,8 @@ def load(self, settings_file):
self.tables = data.pop('tables', '*')
self.exclude_databases = data.pop('exclude_databases', '')
self.exclude_tables = data.pop('exclude_tables', '')
self.log_level = data.pop('log_level', 'info')
self.log_level = data.pop('log_level', Settings.DEFAULT_LOG_LEVEL)
self.optimize_interval = data.pop('optimize_interval', Settings.DEFAULT_OPTIMIZE_INTERVAL)
assert isinstance(self.databases, str) or isinstance(self.databases, list)
assert isinstance(self.tables, str) or isinstance(self.tables, list)
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))
Expand Down
104 changes: 104 additions & 0 deletions mysql_ch_replicator/db_optimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import pickle
import os
import time
from logging import getLogger

from .config import Settings
from .mysql_api import MySQLApi
from .clickhouse_api import ClickhouseApi
from .utils import GracefulKiller


logger = getLogger(__name__)


class State:

def __init__(self, file_name):
self.file_name = file_name
self.last_process_time = {}
self.load()

def load(self):
file_name = self.file_name
if not os.path.exists(file_name):
return
data = open(file_name, 'rb').read()
data = pickle.loads(data)
self.last_process_time = data['last_process_time']

def save(self):
file_name = self.file_name
data = pickle.dumps({
'last_process_time': self.last_process_time,
})
with open(file_name + '.tmp', 'wb') as f:
f.write(data)
os.rename(file_name + '.tmp', file_name)


class DbOptimizer:
def __init__(self, config: Settings):
self.state = State(os.path.join(
config.binlog_replicator.data_dir,
'db_optimizer.bin',
))
self.config = config
self.mysql_api = MySQLApi(
database=None,
mysql_settings=config.mysql,
)
self.clickhouse_api = ClickhouseApi(
database=None,
clickhouse_settings=config.clickhouse,
)

def select_db_to_optimize(self):
databases = self.mysql_api.get_databases()
databases = [db for db in databases if self.config.is_database_matches(db)]
ch_databases = set(self.clickhouse_api.get_databases())

for db in databases:
if db not in ch_databases:
continue
last_process_time = self.state.last_process_time.get(db, 0.0)
if time.time() - last_process_time < self.config.optimize_interval:
continue
return db
return None

def optimize_table(self, db_name, table_name):
logger.info(f'Optimizing table {db_name}.{table_name}')
self.clickhouse_api.execute_command(
f'OPTIMIZE TABLE {db_name}.{table_name} FINAL SETTINGS mutations_sync = 2'
)
logger.info('Optimize finished')
self.state.last_process_time[db_name] = time.time()

def optimize_database(self, db_name):
self.mysql_api.set_database(db_name)
tables = self.mysql_api.get_tables()
tables = [table for table in tables if self.config.is_table_matches(table)]

self.clickhouse_api.execute_command(f'USE {db_name}')
ch_tables = set(self.clickhouse_api.get_tables())

for table in tables:
if table not in ch_tables:
continue
self.optimize_table(db_name, table)
self.state.save()

def run(self):
logger.info('running optimizer')
killer = GracefulKiller()
try:
while not killer.kill_now:
db_to_optimize = self.select_db_to_optimize()
if db_to_optimize is None:
time.sleep(min(120, self.config.optimize_interval))
continue
self.optimize_database(db_name=db_to_optimize)
except Exception as e:
logger.error(f'error {e}', exc_info=True)
logger.info('optimizer stopped')
23 changes: 22 additions & 1 deletion mysql_ch_replicator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .config import Settings
from .db_replicator import DbReplicator
from .binlog_replicator import BinlogReplicator
from .db_optimizer import DbOptimizer
from .monitoring import Monitoring
from .runner import Runner

Expand Down Expand Up @@ -97,6 +98,24 @@ def run_db_replicator(args, config: Settings):
db_replicator.run()


def run_db_optimizer(args, config: Settings):
data_dir = config.binlog_replicator.data_dir
if not os.path.exists(data_dir):
os.mkdir(data_dir)

log_file = os.path.join(
data_dir,
'db_optimizer.log',
)

set_logging_config(f'dbopt {args.db}', log_file=log_file, log_level_str=config.log_level)

db_optimizer = DbOptimizer(
config=config,
)
db_optimizer.run()


def run_monitoring(args, config: Settings):
set_logging_config('monitor', log_level_str=config.log_level)
monitoring = Monitoring(args.db or '', config)
Expand All @@ -114,7 +133,7 @@ def main():
parser.add_argument(
"mode", help="run mode",
type=str,
choices=["run_all", "binlog_replicator", "db_replicator", "monitoring"])
choices=["run_all", "binlog_replicator", "db_replicator", "monitoring", "db_optimizer"])
parser.add_argument("--config", help="config file path", default='config.yaml', type=str)
parser.add_argument("--db", help="source database(s) name", type=str)
parser.add_argument("--target_db", help="target database(s) name, if not set will be same as source", type=str)
Expand All @@ -131,6 +150,8 @@ def main():
run_binlog_replicator(args, config)
if args.mode == 'db_replicator':
run_db_replicator(args, config)
if args.mode == 'db_optimizer':
run_db_optimizer(args, config)
if args.mode == 'monitoring':
run_monitoring(args, config)
if args.mode == 'run_all':
Expand Down
13 changes: 13 additions & 0 deletions mysql_ch_replicator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ def __init__(self, db_name, config_file):
super().__init__(f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator')


class DbOptimizerRunner(ProcessRunner):
def __init__(self, config_file):
super().__init__(f'{sys.argv[0]} --config {config_file} db_optimizer')


class RunAllRunner(ProcessRunner):
def __init__(self, db_name, config_file):
super().__init__(f'{sys.argv[0]} --config {config_file} run_all --db {db_name}')
Expand All @@ -37,6 +42,7 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases:
self.wait_initial_replication = wait_initial_replication
self.runners: dict = {}
self.binlog_runner = None
self.db_optimizer = None

def is_initial_replication_finished(self, db_name):
state_path = os.path.join(
Expand Down Expand Up @@ -65,6 +71,9 @@ def run(self):
self.binlog_runner = BinlogReplicatorRunner(self.config.settings_file)
self.binlog_runner.run()

self.db_optimizer = DbOptimizerRunner(self.config.settings_file)
self.db_optimizer.run()

# First - continue replication for DBs that already finished initial replication
for db in databases:
if not self.is_initial_replication_finished(db_name=db):
Expand Down Expand Up @@ -100,6 +109,10 @@ def run(self):
logger.info('stopping binlog replication')
self.binlog_runner.stop()

if self.db_optimizer is not None:
logger.info('stopping db_optimizer')
self.db_optimizer.stop()

for db_name, db_replication_runner in self.runners.items():
logger.info(f'stopping replication for {db_name}')
db_replication_runner.stop()
Expand Down
15 changes: 13 additions & 2 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,19 @@ def test_runner():
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)

mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=66 WHERE name='Ivan'", commit=True)
time.sleep(4)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 66)

mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=77 WHERE name='Ivan'", commit=True)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 77)

mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=88 WHERE name='Ivan'", commit=True)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 88)

mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Vlad', 99);", commit=True)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)

run_all_runner.stop()

Expand Down
1 change: 1 addition & 0 deletions tests_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ binlog_replicator:

databases: '*test*'
log_level: 'debug'
optimize_interval: 3
Loading