Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mysql-cdc] Optimize the error msg when binlog expire or server id conflict #2024

Merged
merged 2 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,24 @@ public void run(SourceContext<T> sourceContext) throws Exception {
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());

// start the real debezium consumer
debeziumChangeFetcher.runFetchLoop();
try {
debeziumChangeFetcher.runFetchLoop();
} catch (Throwable t) {
if (t.getMessage() != null
&& t.getMessage()
.contains(
"A slave with the same server_uuid/server_id as this slave has connected to the master")) {
throw new RuntimeException(
"The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n"
+ "The server id conflict may happen in the following situations: \n"
+ "1. The server id has been used by other mysql cdc table in the current job.\n"
+ "2. The server id has been used by the mysql cdc table in other jobs.\n"
+ "3. The server id has been used by other sync tools like canal, debezium and so on.\n",
t);
} else {
throw t;
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void validateAndLoadDatabaseHistory(
}

/** Loads the connector's persistent offset (if present) via the given loader. */
private MySqlOffsetContext loadStartingOffsetState(
protected MySqlOffsetContext loadStartingOffsetState(
OffsetContext.Loader<MySqlOffsetContext> loader, MySqlSplit mySqlSplit) {
BinlogOffset offset =
mySqlSplit.isSnapshotSplit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.util.ErrorMessageUtils;
import io.debezium.data.Envelope.Operation;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.ErrorHandler;
Expand Down Expand Up @@ -88,6 +89,8 @@
* <p>Line 268 ~ 270: Clean cache on rotate event to prevent it from growing indefinitely. We should
* remove this class after we bumped a higher debezium version where the
* https://issues.redhat.com/browse/DBZ-5126 has been fixed.
*
* <p>Line 1386 : Add more error details for some exceptions.
*/
public class MySqlStreamingChangeEventSource
implements StreamingChangeEventSource<MySqlOffsetContext> {
Expand Down Expand Up @@ -1380,6 +1383,7 @@ protected DebeziumException wrap(Throwable error) {
+ e.getSQLState()
+ ".";
}
msg = ErrorMessageUtils.optimizeErrorMessage(msg);
return new DebeziumException(msg, error);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.debezium.connector.mysql.util;

import java.util.regex.Pattern;

/** This util tries to optimize error message for some exceptions. */
public class ErrorMessageUtils {
private static final Pattern SERVER_ID_CONFLICT =
Pattern.compile(
".*A slave with the same server_uuid/server_id as this slave has connected to the master.*");
private static final Pattern MISSING_BINLOG_POSITION_WHEN_BINLOG_EXPIRE =
Pattern.compile(
".*The connector is trying to read binlog.*but this is no longer available on the server.*");
private static final Pattern MISSING_TRANSACTION_WHEN_BINLOG_EXPIRE =
Pattern.compile(
".*Cannot replicate because the (master|source) purged required binary logs.*");

/** Add more error details for some exceptions. */
public static String optimizeErrorMessage(String msg) {
if (msg == null) {
return null;
}
if (SERVER_ID_CONFLICT.matcher(msg).matches()) {
// Optimize the error msg when server id conflict
msg +=
"\nThe 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n"
+ "The server id conflict may happen in the following situations: \n"
+ "1. The server id has been used by other mysql cdc table in the current job.\n"
+ "2. The server id has been used by the mysql cdc table in other jobs.\n"
+ "3. The server id has been used by other sync tools like canal, debezium and so on.\n";
} else if (MISSING_BINLOG_POSITION_WHEN_BINLOG_EXPIRE.matcher(msg).matches()
|| MISSING_TRANSACTION_WHEN_BINLOG_EXPIRE.matcher(msg).matches()) {
// Optimize the error msg when binlog is unavailable
msg +=
"\nThe required binary logs are no longer available on the server. This may happen in following situations:\n"
+ "1. The speed of CDC source reading is too slow to exceed the binlog expired period. You can consider increasing the binary log expiration period, you can also to check whether there is back pressure in the job and optimize your job.\n"
+ "2. The job runs normally, but something happens in the database and lead to the binlog cleanup. You can try to check why this cleanup happens from MySQL side.";
}
return msg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertTrue;

/** Utils to help test. */
public class MySqlTestUtils {

Expand Down Expand Up @@ -173,6 +175,20 @@ private static Properties createDebeziumProperties(boolean useLegacyImplementati
return debeziumProps;
}

public static void assertContainsErrorMsg(Throwable t, String errorMsg) {
Throwable temp = t;
boolean findFixMsg = false;
while (temp != null) {
findFixMsg = findFixMsg || temp.getMessage().contains(errorMsg);
if (findFixMsg) {
break;
} else {
temp = temp.getCause();
}
}
assertTrue(findFixMsg);
}

// ---------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,28 @@
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -64,7 +73,10 @@
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.assertContainsErrorMsg;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static org.junit.Assert.assertEquals;
Expand All @@ -74,15 +86,35 @@

/** Tests for {@link BinlogSplitReader}. */
public class BinlogSplitReaderTest extends MySqlSourceTestBase {

private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);

private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
new UniqueDatabase(MYSQL_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);

private static final MySqlContainer MYSQL8_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
private final UniqueDatabase inventoryDatabase8 =
new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);

private BinaryLogClient binaryLogClient;
private MySqlConnection mySqlConnection;

@BeforeClass
public static void beforeClass() {
LOG.info("Starting MySql8 containers...");
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
LOG.info("Container MySql8 is started.");
}

@AfterClass
public static void afterClass() {
LOG.info("Stopping MySql8 containers...");
MYSQL8_CONTAINER.stop();
LOG.info("Container MySql8 is stopped.");
}

@Test
public void testReadSingleBinlogSplit() throws Exception {
customerDatabase.createAndInitialize();
Expand Down Expand Up @@ -689,7 +721,7 @@ public void testHeartbeatEvent() throws Exception {

// Create config and initializer client and connections
MySqlSourceConfig sourceConfig =
getConfigFactory(new String[] {"customers"})
getConfigFactory(MYSQL_CONTAINER, customerDatabase, new String[] {"customers"})
.startupOptions(StartupOptions.latest())
.heartbeatInterval(heartbeatInterval)
.debeziumProperties(dbzProps)
Expand Down Expand Up @@ -721,9 +753,75 @@ public void testHeartbeatEvent() throws Exception {
"Timeout waiting for heartbeat event");
}

@Test
public void testReadBinlogFromUnavailableBinlog() throws Exception {
// Preparations
inventoryDatabase8.createAndInitialize();
MySqlSourceConfig connectionConfig =
getConfig(MYSQL8_CONTAINER, inventoryDatabase8, new String[] {"products"});
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);

// Capture the current binlog offset, and we will start the reader from here
BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection);

// Create a new config to start reading from the offset captured above
MySqlSourceConfig sourceConfig =
getConfig(
MYSQL8_CONTAINER,
inventoryDatabase8,
StartupOptions.specificOffset(startingOffset.getGtidSet()),
new String[] {"products"});

// Create some binlog events and expire the binlog
try (Connection connection = inventoryDatabase8.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
statement.execute("FLUSH LOGS;");
Thread.sleep(3000);
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute("FLUSH LOGS;");
Thread.sleep(3000);
}

// Create reader and submit splits
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
BinlogSplitReader reader = createBinlogReader(sourceConfig, true);

try {
reader.submitSplit(split);
reader.pollSplitRecords();
} catch (Throwable t) {
assertContainsErrorMsg(
t,
"The required binary logs are no longer available on the server. This may happen in following situations:\n"
+ "1. The speed of CDC source reading is too slow to exceed the binlog expired period. You can consider increasing the binary log expiration period, you can also to check whether there is back pressure in the job and optimize your job.\n"
+ "2. The job runs normally, but something happens in the database and lead to the binlog cleanup. You can try to check why this cleanup happens from MySQL side.");
}
}

private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) {
return createBinlogReader(sourceConfig, false);
}

private BinlogSplitReader createBinlogReader(
MySqlSourceConfig sourceConfig, boolean skipValidStartingOffset) {
return new BinlogSplitReader(
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection), 0);
skipValidStartingOffset
? new TestStatefulTaskContext(
sourceConfig, binaryLogClient, mySqlConnection)
: new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection),
0);
}

private MySqlBinlogSplit createBinlogSplit(MySqlSourceConfig sourceConfig) throws Exception {
Expand Down Expand Up @@ -1016,33 +1114,76 @@ private List<MySqlSnapshotSplit> getMySqlSplits(
}

private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) {
return getConfigFactory(captureTables).startupOptions(startupOptions).createConfig(0);
return getConfig(MYSQL_CONTAINER, customerDatabase, startupOptions, captureTables);
}

private MySqlSourceConfig getConfig(
MySqlContainer container,
UniqueDatabase database,
StartupOptions startupOptions,
String[] captureTables) {
return getConfigFactory(container, database, captureTables)
.startupOptions(startupOptions)
.createConfig(0);
}

private MySqlSourceConfig getConfig(String[] captureTables) {
return getConfigFactory(captureTables).createConfig(0);
return getConfig(MYSQL_CONTAINER, customerDatabase, captureTables);
}

private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) {
private MySqlSourceConfig getConfig(
MySqlContainer container, UniqueDatabase database, String[] captureTables) {
return getConfigFactory(container, database, captureTables).createConfig(0);
}

private MySqlSourceConfigFactory getConfigFactory(
MySqlContainer container, UniqueDatabase database, String[] captureTables) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.map(tableName -> database.getDatabaseName() + "." + tableName)
.toArray(String[]::new);

return new MySqlSourceConfigFactory()
.databaseList(customerDatabase.getDatabaseName())
.databaseList(database.getDatabaseName())
.tableList(captureTableIds)
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
.hostname(container.getHost())
.port(container.getDatabasePort())
.username(database.getUsername())
.splitSize(4)
.fetchSize(2)
.password(customerDatabase.getPassword());
.password(database.getPassword());
}

private void addColumnToTable(JdbcConnection connection, String tableId) throws Exception {
connection.execute(
"ALTER TABLE " + tableId + " ADD COLUMN new_int_column INT DEFAULT 15213");
connection.commit();
}

/** This stateful task context will skip valid the starting offset. */
private static class TestStatefulTaskContext extends StatefulTaskContext {

public TestStatefulTaskContext(
MySqlSourceConfig sourceConfig,
BinaryLogClient binaryLogClient,
MySqlConnection connection) {
super(sourceConfig, binaryLogClient, connection);
}

@Override
protected MySqlOffsetContext loadStartingOffsetState(
OffsetContext.Loader<MySqlOffsetContext> loader, MySqlSplit mySqlSplit) {
BinlogOffset offset =
mySqlSplit.isSnapshotSplit()
? BinlogOffset.ofEarliest()
: initializeEffectiveOffset(
mySqlSplit.asBinlogSplit().getStartingOffset(),
getConnection());

LOG.info("Starting offset is initialized to {}", offset);

MySqlOffsetContext mySqlOffsetContext = loader.load(offset.getOffset());
return mySqlOffsetContext;
}
}
}
Loading
Loading