diff --git a/README.md b/README.md index b3240d8..850d241 100644 --- a/README.md +++ b/README.md @@ -49,9 +49,21 @@ For realtime data sync from MySQL to ClickHouse: # ... other settings ... gtid_mode = on enforce_gtid_consistency = 1 -default_authentication_plugin = mysql_native_password +binlog_expire_logs_seconds = 864000 +max_binlog_size = 500M +binlog_format = ROW +``` + - For MariaDB use following settings: +```ini +[mysqld] +# ... other settings ... +gtid_strict_mode = ON +gtid_domain_id = 0 +server_id = 1 +log_bin = /var/log/mysql/mysql-bin.log +binlog_expire_logs_seconds = 864000 +max_binlog_size = 500M binlog_format = ROW - ``` For `AWS RDS` you need to set following settings in `Parameter groups`: diff --git a/docker-compose-tests.yaml b/docker-compose-tests.yaml index 2b97773..761dda4 100644 --- a/docker-compose-tests.yaml +++ b/docker-compose-tests.yaml @@ -31,6 +31,19 @@ services: volumes: - ./test_mysql.cnf:/etc/my.cnf:ro + mariadb_db: + image: mariadb:11.5.2 + environment: + - MARIADB_DATABASE=admin + - MARIADB_ROOT_HOST=% + - MARIADB_ROOT_PASSWORD=admin + networks: + default: + ports: + - 9307:3306 + volumes: + - ./test_mariadb.cnf:/etc/mysql/my.cnf:ro # Adjust path to MariaDB config location if needed + replicator: image: python:3.12.4-slim-bookworm command: bash -c "pip install -r /app/requirements.txt && pip install -r /app/requirements-dev.txt && touch /tmp/ready && tail -f /dev/null" diff --git a/mysql_ch_replicator/binlog_replicator.py b/mysql_ch_replicator/binlog_replicator.py index e7b366e..a3ab723 100644 --- a/mysql_ch_replicator/binlog_replicator.py +++ b/mysql_ch_replicator/binlog_replicator.py @@ -411,8 +411,6 @@ def run(self): if type(event) not in (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent, QueryEvent): continue - assert event.packet.log_pos == self.stream.log_pos - log_event = LogEvent() if hasattr(event, 'table'): log_event.table_name = event.table diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index 226d8c7..f18fe2d 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -21,13 +21,21 @@ def reconnect_if_required(self): curr_time = time.time() if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL: return - #print('(re)connecting to mysql') - self.db = mysql.connector.connect( + conn_settings = dict( host=self.mysql_settings.host, port=self.mysql_settings.port, user=self.mysql_settings.user, passwd=self.mysql_settings.password, ) + try: + self.db = mysql.connector.connect(**conn_settings) + except mysql.connector.errors.DatabaseError as e: + if 'Unknown collation' in str(e): + conn_settings['charset'] = 'utf8mb4' + conn_settings['collation'] = 'utf8mb4_general_ci' + self.db = mysql.connector.connect(**conn_settings) + else: + raise self.cursor = self.db.cursor() if self.database is not None: self.cursor.execute(f'USE {self.database}') diff --git a/test_mariadb.cnf b/test_mariadb.cnf new file mode 100644 index 0000000..28bee8a --- /dev/null +++ b/test_mariadb.cnf @@ -0,0 +1,27 @@ +[client] +default-character-set = utf8mb4 + +[mysql] +default-character-set = utf8mb4 + +[mysqld] +# The defaults from /etc/my.cnf +user = mysql + +# Custom settings +collation-server = utf8mb4_unicode_ci # Changed to a collation supported by MariaDB +character-set-server = utf8mb4 +default_authentication_plugin = mysql_native_password +init_connect = 'SET NAMES utf8mb4' +skip-host-cache +skip-name-resolve +# information_schema_stats_expiry is not available in MariaDB and has been removed. + +# Replication settings for MariaDB +gtid_strict_mode = ON +gtid_domain_id = 0 +server_id = 1 +log_bin = /var/log/mysql/mysql-bin.log +binlog_expire_logs_seconds = 864000 +max_binlog_size = 500M +binlog_format = ROW diff --git a/test_mysql.cnf b/test_mysql.cnf index c4b9fa4..c2ea982 100644 --- a/test_mysql.cnf +++ b/test_mysql.cnf @@ -24,3 +24,6 @@ information_schema_stats_expiry = 0 # replication gtid_mode = on enforce_gtid_consistency = 1 +binlog_expire_logs_seconds = 864000 +max_binlog_size = 500M +binlog_format = ROW #Very important if you want to receive write, update and delete row events diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 309d9cf..ac871af 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -3,6 +3,7 @@ import time import subprocess import json +import pytest from mysql_ch_replicator import config from mysql_ch_replicator import mysql_api @@ -14,6 +15,7 @@ CONFIG_FILE = 'tests_config.yaml' +CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml' TEST_DB_NAME = 'replication_test_db' TEST_TABLE_NAME = 'test_table' TEST_TABLE_NAME_2 = 'test_table_2' @@ -70,9 +72,13 @@ def prepare_env( assert_wait(lambda: db_name not in ch.get_databases()) -def test_e2e_regular(): +@pytest.mark.parametrize('config_file', [ + CONFIG_FILE, + CONFIG_FILE_MARIADB, +]) +def test_e2e_regular(config_file): cfg = config.Settings() - cfg.load(CONFIG_FILE) + cfg.load(config_file) mysql = mysql_api.MySQLApi( database=None, @@ -103,9 +109,9 @@ def test_e2e_regular(): ) mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True) - binlog_replicator_runner = BinlogReplicatorRunner() + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) binlog_replicator_runner.run() - db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) db_replicator_runner.run() assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) diff --git a/tests_config_mariadb.yaml b/tests_config_mariadb.yaml new file mode 100644 index 0000000..7907ed8 --- /dev/null +++ b/tests_config_mariadb.yaml @@ -0,0 +1,19 @@ + +mysql: + host: 'localhost' + port: 9307 + user: 'root' + password: 'admin' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + +databases: '*test*' +log_level: 'debug'