From 664f56da84523f4f21b6f4a70d08f11c7caad2c7 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Tue, 1 Nov 2016 18:42:54 -0500 Subject: [PATCH] DBZ-146 Improved error handling of MySQL Connector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improved the error handling of the MySQL connector to ensure that we’re always stopping the connector when we have a problem handling a binlog event or if we have problems starting. --- .../connector/mysql/BinlogReader.java | 20 +++++- .../connector/mysql/MySqlConnectorTask.java | 69 +++++++++++++++---- .../debezium/connector/mysql/MySqlSchema.java | 4 +- .../history/KafkaDatabaseHistory.java | 5 +- 4 files changed, 80 insertions(+), 18 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index e8e87602b5c..4e0529dba4f 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -264,7 +264,8 @@ protected void handleEvent(Event event) { // Update the source offset info. Note that the client returns the value in *milliseconds*, even though the binlog // contains only *seconds* precision ... EventHeader eventHeader = event.getHeader(); - source.setBinlogTimestampSeconds(eventHeader.getTimestamp() / 1000L); // client returns milliseconds, but only second precision + source.setBinlogTimestampSeconds(eventHeader.getTimestamp() / 1000L); // client returns milliseconds, but only second + // precision source.setBinlogServerId(eventHeader.getServerId()); EventType eventType = eventHeader.getEventType(); if (eventType == EventType.ROTATE) { @@ -288,6 +289,14 @@ protected void handleEvent(Event event) { // And after that event has been processed, always set the starting row number to 0 ... startingRowNumber = 0; + } catch (RuntimeException e) { + // There was an error in the event handler, so propagate the failure to Kafka Connect ... + failed(e, "Error processing binlog event"); + // Do not stop the client, since Kafka Connect should stop the connector on it's own + // (and doing it here may cause problems the second time it is stopped). + // We can clear the listeners though so that we ignore all future events ... + eventHandlers.clear(); + logger.info("Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored."); } catch (InterruptedException e) { // Most likely because this reader was stopped and our thread was interrupted ... Thread.interrupted(); @@ -379,6 +388,15 @@ protected void handleGtidEvent(Event event) { protected void handleQueryEvent(Event event) { QueryEventData command = unwrapData(event); logger.debug("Received update table command: {}", event); + String sql = command.getSql().trim(); + if (sql.equalsIgnoreCase("BEGIN")) { + // ignore these altogether ... + return; + } + if (sql.equalsIgnoreCase("COMMIT")) { + // ignore these altogether ... + return; + } context.dbSchema().applyDdl(context.source(), command.getDatabase(), command.getSql(), (dbName, statements) -> { if (recordSchemaChangesInSourceRecords && recordMakers.schemaChanges(dbName, statements, super::enqueueRecord) > 0) { logger.debug("Recorded DDL statements for database '{}': {}", dbName, statements); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 2cc6b03690a..30f07a039a7 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -7,8 +7,10 @@ import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.connect.errors.ConnectException; @@ -31,9 +33,10 @@ public final class MySqlConnectorTask extends SourceTask { private final Logger logger = LoggerFactory.getLogger(getClass()); - private MySqlTaskContext taskContext; - private SnapshotReader snapshotReader; - private BinlogReader binlogReader; + private final AtomicBoolean runningReader = new AtomicBoolean(false); + private volatile MySqlTaskContext taskContext; + private volatile SnapshotReader snapshotReader; + private volatile BinlogReader binlogReader; private volatile AbstractReader currentReader; /** @@ -51,7 +54,7 @@ public String version() { } @Override - public void start(Map props) { + public synchronized void start(Map props) { if (context == null) { throw new ConnectException("Unexpected null context"); } @@ -171,10 +174,21 @@ public void start(Map props) { } // And start our first reader ... + this.runningReader.set(true); this.currentReader.start(); - } catch (RuntimeException e) { - this.taskContext.shutdown(); - throw e; + } catch (Throwable e) { + // If we don't complete startup, then Kafka Connect will not attempt to stop the connector. So if we + // run into a problem, we have to stop ourselves ... + try { + stop(); + } catch (Throwable s) { + // Log, but don't propagate ... + logger.error("Failed to start the connector (see other exception), but got this error while cleaning up", s); + } + if (e instanceof ConnectException) { + throw e; + } + throw new ConnectException(e); } finally { prevLoggingContext.restore(); } @@ -185,28 +199,47 @@ public List poll() throws InterruptedException { PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task"); try { logger.trace("Polling for events"); - return currentReader.poll(); + AbstractReader reader = currentReader; + return reader != null ? reader.poll() : Collections.emptyList(); } finally { prevLoggingContext.restore(); } } @Override - public void stop() { + public synchronized void stop() { if (context != null) { PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task"); // We need to explicitly stop both readers, in this order. If we were to instead call 'currentReader.stop()', there // is a chance without synchronization that we'd miss the transition and stop only the snapshot reader. And stopping - // both - // is far simpler and more efficient than synchronizing ... + // both is far simpler and more efficient than synchronizing ... try { logger.info("Stopping MySQL connector task"); - if (this.snapshotReader != null) this.snapshotReader.stop(); + if (this.snapshotReader != null) { + try { + this.snapshotReader.stop(); + } catch (Throwable e) { + logger.error("Unexpected error stopping the snapshot reader", e); + } finally { + this.snapshotReader = null; + } + } } finally { try { - if (this.binlogReader != null) this.binlogReader.stop(); + if (this.binlogReader != null) { + try { + this.binlogReader.stop(); + } catch (Throwable e) { + logger.error("Unexpected error stopping the binary log reader", e); + } finally { + this.binlogReader = null; + } + } } finally { + this.currentReader = null; try { + // Capture that our reader is no longer running; used in "transitionToReadBinlog()" ... + this.runningReader.set(false); // Flush and stop database history, close all JDBC connections ... if (this.taskContext != null) taskContext.shutdown(); } catch (Throwable e) { @@ -221,7 +254,15 @@ public void stop() { } } - protected void transitionToReadBinlog() { + /** + * Transition from the snapshot reader to the binlog reader. This method is synchronized (along with {@link #start(Map)} + * and {@link #stop()}) to ensure that we don't transition while we've already begun to stop. + */ + protected synchronized void transitionToReadBinlog() { + if (this.binlogReader == null || !this.runningReader.get()) { + // We are no longer running, so don't start the binlog reader ... + return; + } logger.debug("Transitioning from snapshot reader to binlog reader"); this.binlogReader.start(); this.currentReader = this.binlogReader; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 00751bd91cc..e1ac4bf4dc2 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -117,14 +117,14 @@ public MySqlSchema(Configuration config, String serverName) { /** * Start by acquiring resources needed to persist the database history */ - public void start() { + public synchronized void start() { this.dbHistory.start(); } /** * Stop recording history and release any resources acquired since {@link #start()}. */ - public void shutdown() { + public synchronized void shutdown() { this.dbHistory.stop(); } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java index 9a4319efece..8c216d1acc0 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java @@ -96,7 +96,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory { private String topicName; private Configuration consumerConfig; private Configuration producerConfig; - private KafkaProducer producer; + private volatile KafkaProducer producer; private int recoveryAttempts = -1; private int pollIntervalMs = -1; @@ -151,6 +151,9 @@ public void start() { @Override protected void storeRecord(HistoryRecord record) { + if (this.producer == null) { + throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database history records."); + } logger.trace("Storing record into database history: {}", record); try { ProducerRecord produced = new ProducerRecord<>(topicName, partition, null, record.toString());