diff --git a/README.md b/README.md index 75db2c6..a020ce0 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,8 @@ clickhouse: port: 8323 user: 'default' password: 'default' + connection_timeout: 30 # optional + send_receive_timeout: 300 # optional binlog_replicator: data_dir: '/home/user/binlog/' @@ -119,6 +121,8 @@ binlog_replicator: databases: 'database_name_pattern_*' tables: '*' + +log_level: 'info' # optional ``` @@ -127,6 +131,7 @@ tables: '*' - `binlog_replicator.data_dir` Create a new empty directory, it will be used by script to store it's state - `databases` Databases name pattern to replicate, e.g. `db_*` will match `db_1` `db_2` `db_test`, list is also supported - `tables` (__optional__) - tables to filter, list is also supported +- `log_level` (__optional__) - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`) Few more tables / dbs examples: diff --git a/mysql_ch_replicator/binlog_replicator.py b/mysql_ch_replicator/binlog_replicator.py index b71b500..e7b366e 100644 --- a/mysql_ch_replicator/binlog_replicator.py +++ b/mysql_ch_replicator/binlog_replicator.py @@ -406,6 +406,8 @@ def run(self): self.update_state_if_required(transaction_id) + logger.debug(f'received event {type(event)}, {transaction_id}') + if type(event) not in (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent, QueryEvent): continue @@ -428,6 +430,8 @@ def run(self): if not self.settings.is_database_matches(log_event.db_name): continue + logger.debug(f'event matched {transaction_id}, {log_event.db_name}, {log_event.table_name}') + log_event.transaction_id = transaction_id if isinstance(event, UpdateRowsEvent) or isinstance(event, WriteRowsEvent): log_event.event_type = EventType.ADD_EVENT.value @@ -459,6 +463,16 @@ def run(self): vals = list(vals.values()) log_event.records.append(vals) + if self.settings.debug_log_level: + # records serialization is heavy, only do it with debug log enabled + logger.debug( + f'store event {transaction_id}, ' + f'event type: {log_event.event_type}, ' + f'database: {log_event.db_name} ' + f'table: {log_event.table_name} ' + f'records: {log_event.records}', + ) + self.data_writer.store_event(log_event) self.update_state_if_required(last_transaction_id) diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index 9d15829..cc810e8 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -89,6 +89,8 @@ def __init__(self): self.databases = '' self.tables = '*' self.settings_file = '' + self.log_level = 'info' + self.debug_log_level = False def load(self, settings_file): data = open(settings_file, 'r').read() @@ -99,6 +101,7 @@ def load(self, settings_file): self.clickhouse = ClickhouseSettings(**data['clickhouse']) self.databases = data['databases'] self.tables = data.get('tables', '*') + self.log_level = data.get('log_level', 'info') assert isinstance(self.databases, str) or isinstance(self.databases, list) assert isinstance(self.tables, str) or isinstance(self.tables, list) self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator']) @@ -123,7 +126,14 @@ def is_database_matches(self, db_name): def is_table_matches(self, table_name): return self.is_pattern_matches(table_name, self.tables) + def validate_log_level(self): + if self.log_level not in ['critical', 'error', 'warning', 'info', 'debug']: + raise ValueError(f'wrong log level {self.log_level}') + if self.log_level == 'debug': + self.debug_log_level = True + def validate(self): self.mysql.validate() self.clickhouse.validate() self.binlog_replicator.validate() + self.validate_log_level() diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index d2fa6df..4407a3f 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -255,6 +255,8 @@ def perform_initial_replication_table(self, table_name): primary_key_index = field_names.index(primary_key) primary_key_type = field_types[primary_key_index] + logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}') + stats_number_of_records = 0 last_stats_dump_time = time.time() @@ -270,11 +272,12 @@ def perform_initial_replication_table(self, table_name): limit=DbReplicator.INITIAL_REPLICATION_BATCH_SIZE, start_value=query_start_value, ) + logger.debug(f'extracted {len(records)} records from mysql') records = self.converter.convert_records(records, mysql_table_structure, clickhouse_table_structure) - # for record in records: - # print(dict(zip(field_names, record))) + if self.config.debug_log_level: + logger.debug(f'records: {records}') if not records: break @@ -295,9 +298,17 @@ def perform_initial_replication_table(self, table_name): if curr_time - last_stats_dump_time >= 60.0: last_stats_dump_time = curr_time logger.info( - f'replicating {table_name}, replicated {stats_number_of_records}, primary key: {max_primary_key}', + f'replicating {table_name}, ' + f'replicated {stats_number_of_records} records, ' + f'primary key: {max_primary_key}', ) + logger.info( + f'finish replicating {table_name}, ' + f'replicated {stats_number_of_records} records, ' + f'primary key: {max_primary_key}', + ) + def run_realtime_replication(self): if self.initial_only: logger.info('skip running realtime replication, only initial replication was requested') @@ -337,7 +348,7 @@ def handle_event(self, event: LogEvent): if event.transaction_id <= self.state.last_processed_transaction_non_uploaded: return - logger.debug(f'processing event {event.transaction_id}') + logger.debug(f'processing event {event.transaction_id}, {event.event_type}, {event.table_name}') event_handlers = { EventType.ADD_EVENT.value: self.handle_insert_event, @@ -366,6 +377,12 @@ def save_state_if_required(self, force=False): self.state.save() def handle_insert_event(self, event: LogEvent): + if self.config.debug_log_level: + logger.debug( + f'processing insert event: {event.transaction_id}, ' + f'table: {event.table_name}, ' + f'records: {event.records}', + ) self.stats.insert_events_count += 1 self.stats.insert_records_count += len(event.records) @@ -383,6 +400,12 @@ def handle_insert_event(self, event: LogEvent): current_table_records_to_delete.discard(record_id) def handle_erase_event(self, event: LogEvent): + if self.config.debug_log_level: + logger.debug( + f'processing erase event: {event.transaction_id}, ' + f'table: {event.table_name}, ' + f'records: {event.records}', + ) self.stats.erase_events_count += 1 self.stats.erase_records_count += len(event.records) @@ -404,7 +427,8 @@ def handle_erase_event(self, event: LogEvent): current_table_records_to_insert.pop(record_id, None) def handle_query_event(self, event: LogEvent): - #print(" === handle_query_event", event.records) + if self.config.debug_log_level: + logger.debug(f'processing query event: {event.transaction_id}, query: {event.records}') query = strip_sql_comments(event.records) if query.lower().startswith('alter'): self.handle_alter_query(query, event.db_name) @@ -476,6 +500,9 @@ def upload_records_if_required(self, table_name): self.upload_records() def upload_records(self): + logger.debug( + f'upload records, to insert: {len(self.records_to_insert)}, to delete: {len(self.records_to_delete)}', + ) self.last_records_upload_time = time.time() for table_name, id_to_records in self.records_to_insert.items(): @@ -483,6 +510,8 @@ def upload_records(self): if not records: continue _, ch_table_structure = self.state.tables_structure[table_name] + if self.config.debug_log_level: + logger.debug(f'inserting into {table_name}, records: {records}') self.clickhouse_api.insert(table_name, records, table_structure=ch_table_structure) for table_name, keys_to_remove in self.records_to_delete.items(): @@ -490,6 +519,8 @@ def upload_records(self): continue table_structure: TableStructure = self.state.tables_structure[table_name][0] primary_key_name = table_structure.primary_key + if self.config.debug_log_level: + logger.debug(f'erasing from {table_name}, primary key: {primary_key_name}, values: {keys_to_remove}') self.clickhouse_api.erase( table_name=table_name, field_name=primary_key_name, diff --git a/mysql_ch_replicator/main.py b/mysql_ch_replicator/main.py index 48bc4f7..966dd27 100755 --- a/mysql_ch_replicator/main.py +++ b/mysql_ch_replicator/main.py @@ -13,7 +13,7 @@ from .runner import Runner -def set_logging_config(tags, log_file=None): +def set_logging_config(tags, log_file=None, log_level_str=None): handlers = [] handlers.append(logging.StreamHandler(sys.stderr)) @@ -28,8 +28,21 @@ def set_logging_config(tags, log_file=None): ) ) + log_levels = { + 'critical': logging.CRITICAL, + 'error': logging.ERROR, + 'warning': logging.WARNING, + 'info': logging.INFO, + 'debug': logging.DEBUG, + } + + log_level = log_levels.get(log_level_str) + if log_level is None: + print(f'[warning] unknown log level {log_level_str}, setting info') + log_level = 'info' + logging.basicConfig( - level=logging.INFO, + level=log_level, format=f'[{tags} %(asctime)s %(levelname)8s] %(message)s', handlers=handlers, ) @@ -44,7 +57,7 @@ def run_binlog_replicator(args, config: Settings): 'binlog_replicator.log', ) - set_logging_config('binlogrepl', log_file=log_file) + set_logging_config('binlogrepl', log_file=log_file, log_level_str=config.log_level) binlog_replicator = BinlogReplicator( settings=config, ) @@ -73,7 +86,7 @@ def run_db_replicator(args, config: Settings): 'db_replicator.log', ) - set_logging_config(f'dbrepl {args.db}', log_file=log_file) + set_logging_config(f'dbrepl {args.db}', log_file=log_file, log_level_str=config.log_level) db_replicator = DbReplicator( config=config, @@ -85,13 +98,13 @@ def run_db_replicator(args, config: Settings): def run_monitoring(args, config: Settings): - set_logging_config('monitor') + set_logging_config('monitor', log_level_str=config.log_level) monitoring = Monitoring(args.db or '', config) monitoring.run() def run_all(args, config: Settings): - set_logging_config('runner') + set_logging_config('runner', log_level_str=config.log_level) runner = Runner(config, args.wait_initial_replication, args.db) runner.run() diff --git a/tests_config.yaml b/tests_config.yaml index 0fc7a18..8a722fb 100644 --- a/tests_config.yaml +++ b/tests_config.yaml @@ -16,3 +16,4 @@ binlog_replicator: records_per_file: 100000 databases: '*test*' +log_level: 'debug' diff --git a/tests_config_databases_tables.yaml b/tests_config_databases_tables.yaml index ee1498c..423e917 100644 --- a/tests_config_databases_tables.yaml +++ b/tests_config_databases_tables.yaml @@ -17,3 +17,5 @@ binlog_replicator: databases: ['test_db_1*', 'test_db_2'] tables: ['test_table_1*', 'test_table_2'] + +log_level: 'debug'