Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,17 @@ clickhouse:
port: 8323
user: 'default'
password: 'default'
connection_timeout: 30 # optional
send_receive_timeout: 300 # optional

binlog_replicator:
data_dir: '/home/user/binlog/'
records_per_file: 100000

databases: 'database_name_pattern_*'
tables: '*'

log_level: 'info' # optional
```


Expand All @@ -127,6 +131,7 @@ tables: '*'
- `binlog_replicator.data_dir` Create a new empty directory, it will be used by script to store it's state
- `databases` Databases name pattern to replicate, e.g. `db_*` will match `db_1` `db_2` `db_test`, list is also supported
- `tables` (__optional__) - tables to filter, list is also supported
- `log_level` (__optional__) - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)

Few more tables / dbs examples:

Expand Down
14 changes: 14 additions & 0 deletions mysql_ch_replicator/binlog_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ def run(self):

self.update_state_if_required(transaction_id)

logger.debug(f'received event {type(event)}, {transaction_id}')

if type(event) not in (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent, QueryEvent):
continue

Expand All @@ -428,6 +430,8 @@ def run(self):
if not self.settings.is_database_matches(log_event.db_name):
continue

logger.debug(f'event matched {transaction_id}, {log_event.db_name}, {log_event.table_name}')

log_event.transaction_id = transaction_id
if isinstance(event, UpdateRowsEvent) or isinstance(event, WriteRowsEvent):
log_event.event_type = EventType.ADD_EVENT.value
Expand Down Expand Up @@ -459,6 +463,16 @@ def run(self):
vals = list(vals.values())
log_event.records.append(vals)

if self.settings.debug_log_level:
# records serialization is heavy, only do it with debug log enabled
logger.debug(
f'store event {transaction_id}, '
f'event type: {log_event.event_type}, '
f'database: {log_event.db_name} '
f'table: {log_event.table_name} '
f'records: {log_event.records}',
)

self.data_writer.store_event(log_event)

self.update_state_if_required(last_transaction_id)
Expand Down
10 changes: 10 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def __init__(self):
self.databases = ''
self.tables = '*'
self.settings_file = ''
self.log_level = 'info'
self.debug_log_level = False

def load(self, settings_file):
data = open(settings_file, 'r').read()
Expand All @@ -99,6 +101,7 @@ def load(self, settings_file):
self.clickhouse = ClickhouseSettings(**data['clickhouse'])
self.databases = data['databases']
self.tables = data.get('tables', '*')
self.log_level = data.get('log_level', 'info')
assert isinstance(self.databases, str) or isinstance(self.databases, list)
assert isinstance(self.tables, str) or isinstance(self.tables, list)
self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator'])
Expand All @@ -123,7 +126,14 @@ def is_database_matches(self, db_name):
def is_table_matches(self, table_name):
return self.is_pattern_matches(table_name, self.tables)

def validate_log_level(self):
if self.log_level not in ['critical', 'error', 'warning', 'info', 'debug']:
raise ValueError(f'wrong log level {self.log_level}')
if self.log_level == 'debug':
self.debug_log_level = True

def validate(self):
self.mysql.validate()
self.clickhouse.validate()
self.binlog_replicator.validate()
self.validate_log_level()
41 changes: 36 additions & 5 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ def perform_initial_replication_table(self, table_name):
primary_key_index = field_names.index(primary_key)
primary_key_type = field_types[primary_key_index]

logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}')

stats_number_of_records = 0
last_stats_dump_time = time.time()

Expand All @@ -270,11 +272,12 @@ def perform_initial_replication_table(self, table_name):
limit=DbReplicator.INITIAL_REPLICATION_BATCH_SIZE,
start_value=query_start_value,
)
logger.debug(f'extracted {len(records)} records from mysql')

records = self.converter.convert_records(records, mysql_table_structure, clickhouse_table_structure)

# for record in records:
# print(dict(zip(field_names, record)))
if self.config.debug_log_level:
logger.debug(f'records: {records}')

if not records:
break
Expand All @@ -295,9 +298,17 @@ def perform_initial_replication_table(self, table_name):
if curr_time - last_stats_dump_time >= 60.0:
last_stats_dump_time = curr_time
logger.info(
f'replicating {table_name}, replicated {stats_number_of_records}, primary key: {max_primary_key}',
f'replicating {table_name}, '
f'replicated {stats_number_of_records} records, '
f'primary key: {max_primary_key}',
)

logger.info(
f'finish replicating {table_name}, '
f'replicated {stats_number_of_records} records, '
f'primary key: {max_primary_key}',
)

def run_realtime_replication(self):
if self.initial_only:
logger.info('skip running realtime replication, only initial replication was requested')
Expand Down Expand Up @@ -337,7 +348,7 @@ def handle_event(self, event: LogEvent):
if event.transaction_id <= self.state.last_processed_transaction_non_uploaded:
return

logger.debug(f'processing event {event.transaction_id}')
logger.debug(f'processing event {event.transaction_id}, {event.event_type}, {event.table_name}')

event_handlers = {
EventType.ADD_EVENT.value: self.handle_insert_event,
Expand Down Expand Up @@ -366,6 +377,12 @@ def save_state_if_required(self, force=False):
self.state.save()

def handle_insert_event(self, event: LogEvent):
if self.config.debug_log_level:
logger.debug(
f'processing insert event: {event.transaction_id}, '
f'table: {event.table_name}, '
f'records: {event.records}',
)
self.stats.insert_events_count += 1
self.stats.insert_records_count += len(event.records)

Expand All @@ -383,6 +400,12 @@ def handle_insert_event(self, event: LogEvent):
current_table_records_to_delete.discard(record_id)

def handle_erase_event(self, event: LogEvent):
if self.config.debug_log_level:
logger.debug(
f'processing erase event: {event.transaction_id}, '
f'table: {event.table_name}, '
f'records: {event.records}',
)
self.stats.erase_events_count += 1
self.stats.erase_records_count += len(event.records)

Expand All @@ -404,7 +427,8 @@ def handle_erase_event(self, event: LogEvent):
current_table_records_to_insert.pop(record_id, None)

def handle_query_event(self, event: LogEvent):
#print(" === handle_query_event", event.records)
if self.config.debug_log_level:
logger.debug(f'processing query event: {event.transaction_id}, query: {event.records}')
query = strip_sql_comments(event.records)
if query.lower().startswith('alter'):
self.handle_alter_query(query, event.db_name)
Expand Down Expand Up @@ -476,20 +500,27 @@ def upload_records_if_required(self, table_name):
self.upload_records()

def upload_records(self):
logger.debug(
f'upload records, to insert: {len(self.records_to_insert)}, to delete: {len(self.records_to_delete)}',
)
self.last_records_upload_time = time.time()

for table_name, id_to_records in self.records_to_insert.items():
records = id_to_records.values()
if not records:
continue
_, ch_table_structure = self.state.tables_structure[table_name]
if self.config.debug_log_level:
logger.debug(f'inserting into {table_name}, records: {records}')
self.clickhouse_api.insert(table_name, records, table_structure=ch_table_structure)

for table_name, keys_to_remove in self.records_to_delete.items():
if not keys_to_remove:
continue
table_structure: TableStructure = self.state.tables_structure[table_name][0]
primary_key_name = table_structure.primary_key
if self.config.debug_log_level:
logger.debug(f'erasing from {table_name}, primary key: {primary_key_name}, values: {keys_to_remove}')
self.clickhouse_api.erase(
table_name=table_name,
field_name=primary_key_name,
Expand Down
25 changes: 19 additions & 6 deletions mysql_ch_replicator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .runner import Runner


def set_logging_config(tags, log_file=None):
def set_logging_config(tags, log_file=None, log_level_str=None):

handlers = []
handlers.append(logging.StreamHandler(sys.stderr))
Expand All @@ -28,8 +28,21 @@ def set_logging_config(tags, log_file=None):
)
)

log_levels = {
'critical': logging.CRITICAL,
'error': logging.ERROR,
'warning': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG,
}

log_level = log_levels.get(log_level_str)
if log_level is None:
print(f'[warning] unknown log level {log_level_str}, setting info')
log_level = 'info'

logging.basicConfig(
level=logging.INFO,
level=log_level,
format=f'[{tags} %(asctime)s %(levelname)8s] %(message)s',
handlers=handlers,
)
Expand All @@ -44,7 +57,7 @@ def run_binlog_replicator(args, config: Settings):
'binlog_replicator.log',
)

set_logging_config('binlogrepl', log_file=log_file)
set_logging_config('binlogrepl', log_file=log_file, log_level_str=config.log_level)
binlog_replicator = BinlogReplicator(
settings=config,
)
Expand Down Expand Up @@ -73,7 +86,7 @@ def run_db_replicator(args, config: Settings):
'db_replicator.log',
)

set_logging_config(f'dbrepl {args.db}', log_file=log_file)
set_logging_config(f'dbrepl {args.db}', log_file=log_file, log_level_str=config.log_level)

db_replicator = DbReplicator(
config=config,
Expand All @@ -85,13 +98,13 @@ def run_db_replicator(args, config: Settings):


def run_monitoring(args, config: Settings):
set_logging_config('monitor')
set_logging_config('monitor', log_level_str=config.log_level)
monitoring = Monitoring(args.db or '', config)
monitoring.run()


def run_all(args, config: Settings):
set_logging_config('runner')
set_logging_config('runner', log_level_str=config.log_level)
runner = Runner(config, args.wait_initial_replication, args.db)
runner.run()

Expand Down
1 change: 1 addition & 0 deletions tests_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ binlog_replicator:
records_per_file: 100000

databases: '*test*'
log_level: 'debug'
2 changes: 2 additions & 0 deletions tests_config_databases_tables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ binlog_replicator:

databases: ['test_db_1*', 'test_db_2']
tables: ['test_table_1*', 'test_table_2']

log_level: 'debug'
Loading