Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-146 Improved error handling of MySQL Connector #130

Merged
merged 1 commit into from
Nov 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ protected void handleEvent(Event event) {
// Update the source offset info. Note that the client returns the value in *milliseconds*, even though the binlog
// contains only *seconds* precision ...
EventHeader eventHeader = event.getHeader();
source.setBinlogTimestampSeconds(eventHeader.getTimestamp() / 1000L); // client returns milliseconds, but only second precision
source.setBinlogTimestampSeconds(eventHeader.getTimestamp() / 1000L); // client returns milliseconds, but only second
// precision
source.setBinlogServerId(eventHeader.getServerId());
EventType eventType = eventHeader.getEventType();
if (eventType == EventType.ROTATE) {
Expand All @@ -288,6 +289,14 @@ protected void handleEvent(Event event) {

// And after that event has been processed, always set the starting row number to 0 ...
startingRowNumber = 0;
} catch (RuntimeException e) {
// There was an error in the event handler, so propagate the failure to Kafka Connect ...
failed(e, "Error processing binlog event");
// Do not stop the client, since Kafka Connect should stop the connector on it's own
// (and doing it here may cause problems the second time it is stopped).
// We can clear the listeners though so that we ignore all future events ...
eventHandlers.clear();
logger.info("Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.");
} catch (InterruptedException e) {
// Most likely because this reader was stopped and our thread was interrupted ...
Thread.interrupted();
Expand Down Expand Up @@ -379,6 +388,15 @@ protected void handleGtidEvent(Event event) {
protected void handleQueryEvent(Event event) {
QueryEventData command = unwrapData(event);
logger.debug("Received update table command: {}", event);
String sql = command.getSql().trim();
if (sql.equalsIgnoreCase("BEGIN")) {
// ignore these altogether ...
return;
}
if (sql.equalsIgnoreCase("COMMIT")) {
// ignore these altogether ...
return;
}
context.dbSchema().applyDdl(context.source(), command.getDatabase(), command.getSql(), (dbName, statements) -> {
if (recordSchemaChangesInSourceRecords && recordMakers.schemaChanges(dbName, statements, super::enqueueRecord) > 0) {
logger.debug("Recorded DDL statements for database '{}': {}", dbName, statements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -31,9 +33,10 @@
public final class MySqlConnectorTask extends SourceTask {

private final Logger logger = LoggerFactory.getLogger(getClass());
private MySqlTaskContext taskContext;
private SnapshotReader snapshotReader;
private BinlogReader binlogReader;
private final AtomicBoolean runningReader = new AtomicBoolean(false);
private volatile MySqlTaskContext taskContext;
private volatile SnapshotReader snapshotReader;
private volatile BinlogReader binlogReader;
private volatile AbstractReader currentReader;

/**
Expand All @@ -51,7 +54,7 @@ public String version() {
}

@Override
public void start(Map<String, String> props) {
public synchronized void start(Map<String, String> props) {
if (context == null) {
throw new ConnectException("Unexpected null context");
}
Expand Down Expand Up @@ -171,10 +174,21 @@ public void start(Map<String, String> props) {
}

// And start our first reader ...
this.runningReader.set(true);
this.currentReader.start();
} catch (RuntimeException e) {
this.taskContext.shutdown();
throw e;
} catch (Throwable e) {
// If we don't complete startup, then Kafka Connect will not attempt to stop the connector. So if we
// run into a problem, we have to stop ourselves ...
try {
stop();
} catch (Throwable s) {
// Log, but don't propagate ...
logger.error("Failed to start the connector (see other exception), but got this error while cleaning up", s);
}
if (e instanceof ConnectException) {
throw e;
}
throw new ConnectException(e);
} finally {
prevLoggingContext.restore();
}
Expand All @@ -185,28 +199,47 @@ public List<SourceRecord> poll() throws InterruptedException {
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
logger.trace("Polling for events");
return currentReader.poll();
AbstractReader reader = currentReader;
return reader != null ? reader.poll() : Collections.emptyList();
} finally {
prevLoggingContext.restore();
}
}

@Override
public void stop() {
public synchronized void stop() {
if (context != null) {
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
// We need to explicitly stop both readers, in this order. If we were to instead call 'currentReader.stop()', there
// is a chance without synchronization that we'd miss the transition and stop only the snapshot reader. And stopping
// both
// is far simpler and more efficient than synchronizing ...
// both is far simpler and more efficient than synchronizing ...
try {
logger.info("Stopping MySQL connector task");
if (this.snapshotReader != null) this.snapshotReader.stop();
if (this.snapshotReader != null) {
try {
this.snapshotReader.stop();
} catch (Throwable e) {
logger.error("Unexpected error stopping the snapshot reader", e);
} finally {
this.snapshotReader = null;
}
}
} finally {
try {
if (this.binlogReader != null) this.binlogReader.stop();
if (this.binlogReader != null) {
try {
this.binlogReader.stop();
} catch (Throwable e) {
logger.error("Unexpected error stopping the binary log reader", e);
} finally {
this.binlogReader = null;
}
}
} finally {
this.currentReader = null;
try {
// Capture that our reader is no longer running; used in "transitionToReadBinlog()" ...
this.runningReader.set(false);
// Flush and stop database history, close all JDBC connections ...
if (this.taskContext != null) taskContext.shutdown();
} catch (Throwable e) {
Expand All @@ -221,7 +254,15 @@ public void stop() {
}
}

protected void transitionToReadBinlog() {
/**
* Transition from the snapshot reader to the binlog reader. This method is synchronized (along with {@link #start(Map)}
* and {@link #stop()}) to ensure that we don't transition while we've already begun to stop.
*/
protected synchronized void transitionToReadBinlog() {
if (this.binlogReader == null || !this.runningReader.get()) {
// We are no longer running, so don't start the binlog reader ...
return;
}
logger.debug("Transitioning from snapshot reader to binlog reader");
this.binlogReader.start();
this.currentReader = this.binlogReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ public MySqlSchema(Configuration config, String serverName) {
/**
* Start by acquiring resources needed to persist the database history
*/
public void start() {
public synchronized void start() {
this.dbHistory.start();
}

/**
* Stop recording history and release any resources acquired since {@link #start()}.
*/
public void shutdown() {
public synchronized void shutdown() {
this.dbHistory.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
private String topicName;
private Configuration consumerConfig;
private Configuration producerConfig;
private KafkaProducer<String, String> producer;
private volatile KafkaProducer<String, String> producer;
private int recoveryAttempts = -1;
private int pollIntervalMs = -1;

Expand Down Expand Up @@ -151,6 +151,9 @@ public void start() {

@Override
protected void storeRecord(HistoryRecord record) {
if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database history records.");
}
logger.trace("Storing record into database history: {}", record);
try {
ProducerRecord<String, String> produced = new ProducerRecord<>(topicName, partition, null, record.toString());
Expand Down