Skip to content

Commit

Permalink
MaterializedMySQL: Introduce MySQL Binlog Client
Browse files Browse the repository at this point in the history
One binlog connection for many databases.

1. Introduced `IBinlog` and its impl to read the binlog events from socket - `BinlogFromSocket`, or file - `BinlogFromFile`. Based on prev impl of `EventBase` and the same old binlog parsers. It fully keeps BC with old version. Fixed `./check-mysql-binlog` to test new impl.
2. Introduced `BinlogEventsDispatcher`, it reads the event from the source `IBinlog` and sends it to currently attached `IBinlog` instances.
3. Introduced `BinlogClient`, which is used to group a list of `BinlogEventsDispatcher` by MySQL binlog connection which is defined by `user:password@host:port`. All dispatchers with the same binlog position should be merged to one.
4. Introduced `BinlogClientCatalog`, which is a singleton and it is used to track all binlogs created over the instance.
5. Introduced `use_binlog_client` setting to `MaterializedMySQL`, which forces to reuse a `BinlogClient` if it already exists in `BinlogClientCatalog` or create new one. By default, it is disabled.
6. Introduced `max_bytes_in_binlog_queue` setting to define the limit of bytes in binlog's queue of events. If bytes in the queue increases this limit, `BinlogEventsDispatcher` will stop reading new events from source `IBinlog` until the space for new events will be freed.
7. Introduced `max_milliseconds_to_wait_in_binlog_queue` setting to define max ms to wait when the max bytes exceeded.
8. Introduced `max_bytes_in_binlog_dispatcher_buffer` setting to define max bytes in the binlog dispatcher's buffer before it is flushed to attached binlogs.
9. Introduced `max_flush_milliseconds_in_binlog_dispatcher` setting to define max milliseconds in the binlog dispatcher's buffer to wait before it is flushed to attached binlogs.
10. Introduced `system.mysql_binlogs` system table, which shows a list of active binlogs.
11. Introduced `UnparsedRowsEvent` and `MYSQL_UNPARSED_ROWS_EVENT`, which defines that an event is not parsed and should be explicitly parsed later.

@larspars is the author of following:
`GTIDSets::contains()`
`ReplicationHelper`
`shouldReconnectOnException()`
  • Loading branch information
valbok committed Nov 30, 2023
1 parent d503eec commit 8cc2f46
Show file tree
Hide file tree
Showing 25 changed files with 4,859 additions and 182 deletions.
176 changes: 176 additions & 0 deletions docs/en/operations/settings/mysql-binlog-client.md
@@ -0,0 +1,176 @@
# The MySQL Binlog Client

The MySQL Binlog Client provides a mechanism in ClickHouse to share the binlog from a MySQL instance among multiple [MaterializedMySQL](../../engines/database-engines/materialized-mysql.md) databases. This avoids consuming unnecessary bandwidth and CPU when replicating more than one schema/database.

The implementation is resilient against crashes and disk issues. The executed GTID sets of the binlog itself and the consuming databases have persisted only after the data they describe has been safely persisted as well. The implementation also tolerates re-doing aborted operations (at-least-once delivery).

# Settings

## use_binlog_client

Forces to reuse existing MySQL binlog connection or creates new one if does not exist. The connection is defined by `user:pass@host:port`.

Default value: 0

**Example**

```sql
-- create MaterializedMySQL databases that read the events from the binlog client
CREATE DATABASE db1 ENGINE = MaterializedMySQL('host:port', 'db1', 'user', 'password') SETTINGS use_binlog_client=1
CREATE DATABASE db2 ENGINE = MaterializedMySQL('host:port', 'db2', 'user', 'password') SETTINGS use_binlog_client=1
CREATE DATABASE db3 ENGINE = MaterializedMySQL('host:port', 'db3', 'user2', 'password2') SETTINGS use_binlog_client=1
```

Databases `db1` and `db2` will use the same binlog connection, since they use the same `user:pass@host:port`. Database `db3` will use separate binlog connection.

## max_bytes_in_binlog_queue

Defines the limit of bytes in the events binlog queue. If bytes in the queue increases this limit, it will stop reading new events from MySQL until the space for new events will be freed. This introduces the memory limits. Very high value could consume all available memory. Very low value could make the databases to wait for new events.

Default value: 67108864

**Example**

```sql
CREATE DATABASE db1 ENGINE = MaterializedMySQL('host:port', 'db1', 'user', 'password') SETTINGS use_binlog_client=1, max_bytes_in_binlog_queue=33554432
CREATE DATABASE db2 ENGINE = MaterializedMySQL('host:port', 'db2', 'user', 'password') SETTINGS use_binlog_client=1
```

If database `db1` is unable to consume binlog events fast enough and the size of the events queue exceeds `33554432` bytes, reading of new events from MySQL is postponed until `db1`
consumes the events and releases some space.

NOTE: This will impact to `db2`, and it will be waiting for new events too, since they share the same connection.

## max_milliseconds_to_wait_in_binlog_queue

Defines the max milliseconds to wait when `max_bytes_in_binlog_queue` exceeded. After that it will detach the database from current binlog connection and will retry establish new one to prevent other databases to wait for this database.

Default value: 10000

**Example**

```sql
CREATE DATABASE db1 ENGINE = MaterializedMySQL('host:port', 'db1', 'user', 'password') SETTINGS use_binlog_client=1, max_bytes_in_binlog_queue=33554432, max_milliseconds_to_wait_in_binlog_queue=1000
CREATE DATABASE db2 ENGINE = MaterializedMySQL('host:port', 'db2', 'user', 'password') SETTINGS use_binlog_client=1
```

If the event queue of database `db1` is full, the binlog connection will be waiting in `1000`ms and if the database is not able to consume the events, it will be detached from the connection to create another one.

NOTE: If the database `db1` has been detached from the shared connection and created new one, after the binlog connections for `db1` and `db2` have the same positions they will be merged to one. And `db1` and `db2` will use the same connection again.

## max_bytes_in_binlog_dispatcher_buffer

Defines the max bytes in the binlog dispatcher's buffer before it is flushed to attached binlog. The events from MySQL binlog connection are buffered before sending to attached databases. It increases the events throughput from the binlog to databases.

Default value: 1048576

## max_flush_milliseconds_in_binlog_dispatcher

Defines the max milliseconds in the binlog dispatcher's buffer to wait before it is flushed to attached binlog. If there are no events received from MySQL binlog connection for a while, after some time buffered events should be sent to the attached databases.

Default value: 1000

# Design

## The Binlog Events Dispatcher

Currently each MaterializedMySQL database opens its own connection to MySQL to subscribe to binlog events. There is a need to have only one connection and _dispatch_ the binlog events to all databases that replicate from the same MySQL instance.

## Each MaterializedMySQL Database Has Its Own Event Queue

To prevent slowing down other instances there should be an _event queue_ per MaterializedMySQL database to handle the events independently of the speed of other instances. The dispatcher reads an event from the binlog, and sends it to every MaterializedMySQL database that needs it. Each database handles its events in separate threads.

## Catching up

If several databases have the same binlog position, they can use the same dispatcher. If a newly created database (or one that has been detached for some time) requests events that have been already processed, we need to create another communication _channel_ to the binlog. We do this by creating another temporary dispatcher for such databases. When the new dispatcher _catches up with_ the old one, the new/temporary dispatcher is not needed anymore and all databases getting events from this dispatcher can be moved to the old one.

## Memory Limit

There is a _memory limit_ to control event queue memory consumption per MySQL Client. If a database is not able to handle events fast enough, and the event queue is getting full, we have the following options:

1. The dispatcher is blocked until the slowest database frees up space for new events. All other databases are waiting for the slowest one. (Preferred)
2. The dispatcher is _never_ blocked, but suspends incremental sync for the slow database and continues dispatching events to remained databases.

## Performance

A lot of CPU can be saved by not processing every event in every database. The binlog contains events for all databases, it is wasteful to distribute row events to a database that it will not process it, especially if there are a lot of databases. This requires some sort of per-database binlog filtering and buffering.

Currently all events are sent to all MaterializedMySQL databases but parsing the event which consumes CPU is up to the database.

# Detailed Design

1. If a client (e.g. database) wants to read a stream of the events from MySQL binlog, it creates a connection to remote binlog by host/user/password and _executed GTID set_ params.
2. If another client wants to read the events from the binlog but for different _executed GTID set_, it is **not** possible to reuse existing connection to MySQL, then need to create another connection to the same remote binlog. (_This is how it is implemented today_).
3. When these 2 connections get the same binlog positions, they read the same events. It is logical to drop duplicate connection and move all its users out. And now one connection dispatches binlog events to several clients. Obviously only connections to the same binlog should be merged.

## Classes

1. One connection can send (or dispatch) events to several clients and might be called `BinlogEventsDispatcher`.
2. Several dispatchers grouped by _user:password@host:port_ in `BinlogClient`. Since they point to the same binlog.
3. The clients should communicate only with public API from `BinlogClient`. The result of using `BinlogClient` is an object that implements `IBinlog` to read events from. This implementation of `IBinlog` must be compatible with old implementation `MySQLFlavor` -> when replacing old implementation by new one, the behavior must not be changed.

## SQL

```sql
-- create MaterializedMySQL databases that read the events from the binlog client
CREATE DATABASE db1_client1 ENGINE = MaterializedMySQL('host:port', 'db', 'user', 'password') SETTINGS use_binlog_client=1, max_bytes_in_binlog_queue=1024;
CREATE DATABASE db2_client1 ENGINE = MaterializedMySQL('host:port', 'db', 'user', 'password') SETTINGS use_binlog_client=1;
CREATE DATABASE db3_client1 ENGINE = MaterializedMySQL('host:port', 'db2', 'user', 'password') SETTINGS use_binlog_client=1;
CREATE DATABASE db4_client2 ENGINE = MaterializedMySQL('host2:port', 'db', 'user', 'password') SETTINGS use_binlog_client=1;
CREATE DATABASE db5_client3 ENGINE = MaterializedMySQL('host:port', 'db', 'user1', 'password') SETTINGS use_binlog_client=1;
CREATE DATABASE db6_old ENGINE = MaterializedMySQL('host:port', 'db', 'user1', 'password') SETTINGS use_binlog_client=0;
```

Databases `db1_client1`, `db2_client1` and `db3_client1` share one instance of `BinlogClient` since they have the same params. `BinlogClient` will create 3 connections to MySQL server thus 3 instances of `BinlogEventsDispatcher`, but if these connections would have the same binlog position, they should be merged to one connection. Means all clients will be moved to one dispatcher and others will be closed. Databases `db4_client2` and `db5_client3` would use 2 different independent `BinlogClient` instances. Database `db6_old` will use old implementation. NOTE: By default `use_binlog_client` is disabled. Setting `max_bytes_in_binlog_queue` defines the max allowed bytes in the binlog queue. By default, it is `1073741824` bytes. If number of bytes exceeds this limit, the dispatching will be stopped until the space will be freed for new events.

## Binlog Table Structure

To see the status of the all `BinlogClient` instances there is `system.mysql_binlogs` system table. It shows the list of all created and _alive_ `IBinlog` instances with information about its `BinlogEventsDispatcher` and `BinlogClient`.

Example:

```
SELECT * FROM system.mysql_binlogs FORMAT Vertical
Row 1:
──────
binlog_client_name: root@127.0.0.1:3306
name: test_Clickhouse1
mysql_binlog_name: binlog.001154
mysql_binlog_pos: 7142294
mysql_binlog_timestamp: 1660082447
mysql_binlog_executed_gtid_set: a9d88f83-c14e-11ec-bb36-244bfedf7766:1-30523304
dispatcher_name: Applier
dispatcher_mysql_binlog_name: binlog.001154
dispatcher_mysql_binlog_pos: 7142294
dispatcher_mysql_binlog_timestamp: 1660082447
dispatcher_mysql_binlog_executed_gtid_set: a9d88f83-c14e-11ec-bb36-244bfedf7766:1-30523304
size: 0
bytes: 0
max_bytes: 0
```

### Tests

Unit tests:

```
$ ./unit_tests_dbms --gtest_filter=MySQLBinlog.*
```

Integration tests:

```
$ pytest -s -vv test_materialized_mysql_database/test.py::test_binlog_client
```

Dumps events from the file

```
$ ./utils/check-mysql-binlog/check-mysql-binlog --binlog binlog.001392
```

Dumps events from the server

```
$ ./utils/check-mysql-binlog/check-mysql-binlog --host 127.0.0.1 --port 3306 --user root --password pass --gtid a9d88f83-c14e-11ec-bb36-244bfedf7766:1-30462856
```
4 changes: 2 additions & 2 deletions src/Core/MySQL/MySQLGtid.cpp
Expand Up @@ -214,7 +214,7 @@ bool GTIDSet::contains(const GTIDSet & gtid_set) const

bool GTIDSets::contains(const GTIDSet & gtid_set) const
{
for (auto & my_gtid_set : sets)
for (const auto & my_gtid_set : sets)
{
if (my_gtid_set.contains(gtid_set)) { return true; }
}
Expand All @@ -223,7 +223,7 @@ bool GTIDSets::contains(const GTIDSet & gtid_set) const

bool GTIDSets::contains(const GTIDSets & gtid_sets) const
{
for (auto & gtid_set : gtid_sets.sets)
for (const auto & gtid_set : gtid_sets.sets)
{
if (!this->contains(gtid_set)) { return false; }
}
Expand Down
2 changes: 2 additions & 0 deletions src/Core/MySQL/MySQLReplication.h
Expand Up @@ -275,6 +275,8 @@ namespace MySQLReplication
String status;
String schema;
String query;
String query_database_name;
String query_table_name;
QueryType typ = QUERY_EVENT_DDL;
bool transaction_complete = true;

Expand Down
10 changes: 9 additions & 1 deletion src/Databases/DatabaseFactory.cpp
Expand Up @@ -23,6 +23,7 @@

#if USE_MYSQL
# include <Core/MySQL/MySQLClient.h>
# include <Databases/MySQL/MySQLBinlogClientCatalog.h>
# include <Databases/MySQL/DatabaseMySQL.h>
# include <Databases/MySQL/MaterializedMySQLSettings.h>
# include <Storages/MySQL/MySQLHelpers.h>
Expand Down Expand Up @@ -187,6 +188,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);

MySQLReplication::BinlogClientPtr binlog_client;
StorageMySQL::Configuration configuration;
ASTs & arguments = engine->arguments->children;
auto mysql_settings = std::make_unique<MySQLSettings>();
Expand Down Expand Up @@ -245,6 +247,12 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);

if (materialize_mode_settings->use_binlog_client)
binlog_client = DB::MySQLReplication::BinlogClientCatalog::instance().getClient(
configuration.host, configuration.port, configuration.username, configuration.password,
materialize_mode_settings->max_bytes_in_binlog_dispatcher_buffer,
materialize_mode_settings->max_flush_milliseconds_in_binlog_dispatcher);

if (uuid == UUIDHelpers::Nil)
{
auto print_create_ast = create.clone();
Expand All @@ -258,7 +266,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String

return std::make_shared<DatabaseMaterializedMySQL>(
context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool),
std::move(client), std::move(materialize_mode_settings));
std::move(client), binlog_client, std::move(materialize_mode_settings));
}
catch (...)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Databases/MySQL/DatabaseMaterializedMySQL.cpp
Expand Up @@ -30,10 +30,11 @@ DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
const MySQLReplication::BinlogClientPtr & binlog_client_,
std::unique_ptr<MaterializedMySQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializedMySQL(" + database_name_ + ")", context_)
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), binlog_client_, settings.get())
{
}

Expand Down
2 changes: 2 additions & 0 deletions src/Databases/MySQL/DatabaseMaterializedMySQL.h
Expand Up @@ -9,6 +9,7 @@
#include <base/UUID.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/MySQL/MySQLBinlogClient.h>
#include <Databases/MySQL/MaterializedMySQLSettings.h>
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
#include <Common/logger_useful.h>
Expand All @@ -31,6 +32,7 @@ class DatabaseMaterializedMySQL : public DatabaseAtomic
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
const MySQLReplication::BinlogClientPtr & binlog_client_,
std::unique_ptr<MaterializedMySQLSettings> settings_);

void rethrowExceptionIfNeeded() const;
Expand Down
5 changes: 5 additions & 0 deletions src/Databases/MySQL/MaterializedMySQLSettings.h
Expand Up @@ -17,6 +17,11 @@ class ASTStorage;
M(Int64, max_wait_time_when_mysql_unavailable, 1000, "Retry interval when MySQL is not available (milliseconds). Negative value disable retry.", 0) \
M(Bool, allows_query_when_mysql_lost, false, "Allow query materialized table when mysql is lost.", 0) \
M(String, materialized_mysql_tables_list, "", "a comma-separated list of mysql database tables, which will be replicated by MaterializedMySQL database engine. Default value: empty list — means whole tables will be replicated.", 0) \
M(Bool, use_binlog_client, false, "Use MySQL Binlog Client.", 0) \
M(UInt64, max_bytes_in_binlog_queue, 64 * 1024 * 1024, "Max bytes in binlog's queue created from MySQL Binlog Client.", 0) \
M(UInt64, max_milliseconds_to_wait_in_binlog_queue, 10000, "Max milliseconds to wait when max bytes exceeded in a binlog queue.", 0) \
M(UInt64, max_bytes_in_binlog_dispatcher_buffer, DBMS_DEFAULT_BUFFER_SIZE, "Max bytes in the binlog dispatcher's buffer before it is flushed to attached binlogs.", 0) \
M(UInt64, max_flush_milliseconds_in_binlog_dispatcher, 1000, "Max milliseconds in the binlog dispatcher's buffer to wait before it is flushed to attached binlogs.", 0) \

DECLARE_SETTINGS_TRAITS(MaterializedMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)

Expand Down

0 comments on commit 8cc2f46

Please sign in to comment.