Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,21 @@ For realtime data sync from MySQL to ClickHouse:
# ... other settings ...
gtid_mode = on
enforce_gtid_consistency = 1
default_authentication_plugin = mysql_native_password
binlog_expire_logs_seconds = 864000
max_binlog_size = 500M
binlog_format = ROW
```
- For MariaDB use following settings:
```ini
[mysqld]
# ... other settings ...
gtid_strict_mode = ON
gtid_domain_id = 0
server_id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_expire_logs_seconds = 864000
max_binlog_size = 500M
binlog_format = ROW

```

For `AWS RDS` you need to set following settings in `Parameter groups`:
Expand Down
13 changes: 13 additions & 0 deletions docker-compose-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ services:
volumes:
- ./test_mysql.cnf:/etc/my.cnf:ro

mariadb_db:
image: mariadb:11.5.2
environment:
- MARIADB_DATABASE=admin
- MARIADB_ROOT_HOST=%
- MARIADB_ROOT_PASSWORD=admin
networks:
default:
ports:
- 9307:3306
volumes:
- ./test_mariadb.cnf:/etc/mysql/my.cnf:ro # Adjust path to MariaDB config location if needed

replicator:
image: python:3.12.4-slim-bookworm
command: bash -c "pip install -r /app/requirements.txt && pip install -r /app/requirements-dev.txt && touch /tmp/ready && tail -f /dev/null"
Expand Down
2 changes: 0 additions & 2 deletions mysql_ch_replicator/binlog_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,6 @@ def run(self):
if type(event) not in (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent, QueryEvent):
continue

assert event.packet.log_pos == self.stream.log_pos

log_event = LogEvent()
if hasattr(event, 'table'):
log_event.table_name = event.table
Expand Down
12 changes: 10 additions & 2 deletions mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@ def reconnect_if_required(self):
curr_time = time.time()
if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL:
return
#print('(re)connecting to mysql')
self.db = mysql.connector.connect(
conn_settings = dict(
host=self.mysql_settings.host,
port=self.mysql_settings.port,
user=self.mysql_settings.user,
passwd=self.mysql_settings.password,
)
try:
self.db = mysql.connector.connect(**conn_settings)
except mysql.connector.errors.DatabaseError as e:
if 'Unknown collation' in str(e):
conn_settings['charset'] = 'utf8mb4'
conn_settings['collation'] = 'utf8mb4_general_ci'
self.db = mysql.connector.connect(**conn_settings)
else:
raise
self.cursor = self.db.cursor()
if self.database is not None:
self.cursor.execute(f'USE {self.database}')
Expand Down
27 changes: 27 additions & 0 deletions test_mariadb.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[client]
default-character-set = utf8mb4

[mysql]
default-character-set = utf8mb4

[mysqld]
# The defaults from /etc/my.cnf
user = mysql

# Custom settings
collation-server = utf8mb4_unicode_ci # Changed to a collation supported by MariaDB
character-set-server = utf8mb4
default_authentication_plugin = mysql_native_password
init_connect = 'SET NAMES utf8mb4'
skip-host-cache
skip-name-resolve
# information_schema_stats_expiry is not available in MariaDB and has been removed.

# Replication settings for MariaDB
gtid_strict_mode = ON
gtid_domain_id = 0
server_id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_expire_logs_seconds = 864000
max_binlog_size = 500M
binlog_format = ROW
3 changes: 3 additions & 0 deletions test_mysql.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ information_schema_stats_expiry = 0
# replication
gtid_mode = on
enforce_gtid_consistency = 1
binlog_expire_logs_seconds = 864000
max_binlog_size = 500M
binlog_format = ROW #Very important if you want to receive write, update and delete row events
14 changes: 10 additions & 4 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import subprocess
import json
import pytest

from mysql_ch_replicator import config
from mysql_ch_replicator import mysql_api
Expand All @@ -14,6 +15,7 @@


CONFIG_FILE = 'tests_config.yaml'
CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml'
TEST_DB_NAME = 'replication_test_db'
TEST_TABLE_NAME = 'test_table'
TEST_TABLE_NAME_2 = 'test_table_2'
Expand Down Expand Up @@ -70,9 +72,13 @@ def prepare_env(
assert_wait(lambda: db_name not in ch.get_databases())


def test_e2e_regular():
@pytest.mark.parametrize('config_file', [
CONFIG_FILE,
CONFIG_FILE_MARIADB,
])
def test_e2e_regular(config_file):
cfg = config.Settings()
cfg.load(CONFIG_FILE)
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
Expand Down Expand Up @@ -103,9 +109,9 @@ def test_e2e_regular():
)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True)

binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
Expand Down
19 changes: 19 additions & 0 deletions tests_config_mariadb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

mysql:
host: 'localhost'
port: 9307
user: 'root'
password: 'admin'

clickhouse:
host: 'localhost'
port: 9123
user: 'default'
password: 'admin'

binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000

databases: '*test*'
log_level: 'debug'
Loading