Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-601 Snapshots fail if launching multiple connectors at once #524

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -29,10 +29,12 @@ public class BlockingReader implements Reader {
private final Metronome metronome;

private final String name;
private final String runningLogMessage;

public BlockingReader(String name) {
public BlockingReader(String name, String runningLogMessage) {
this.name = name;
this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
this.runningLogMessage = runningLogMessage;

}

Expand Down Expand Up @@ -65,7 +67,7 @@ public void uponCompletion(Runnable handler) {
@Override
public void start() {
state.set(State.RUNNING);
logger.info("Connector has completed all of its work but will continue in the running state. It can be shut down at any time.");
logger.info(runningLogMessage);
}

@Override
Expand Down
Expand Up @@ -882,6 +882,15 @@ public static EventProcessingFailureHandlingMode parse(String value) {
"The value of those properties is the select statement to use when retrieving data from the specific table during snapshotting. " +
"A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted.");

public static final Field SNAPSHOT_DELAY_MS = Field.createInternal("snapshot.delay.ms")
.withDisplayName("Snapshot Delay (milliseconds)")
.withType(Type.LONG)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("The number of milliseconds to delay before a snapshot will begin.")
.withDefault(0L)
.withValidation(Field::isNonNegativeLong);

/**
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
* have their values truncated to be no longer than the specified number of characters.
Expand Down Expand Up @@ -935,6 +944,7 @@ public static final Field MASK_COLUMN(int length) {
BIGINT_UNSIGNED_HANDLING_MODE,
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
INCONSISTENT_SCHEMA_HANDLING_MODE,
SNAPSHOT_DELAY_MS,
CommonConnectorConfig.TOMBSTONES_ON_DELETE);

/**
Expand Down Expand Up @@ -991,7 +1001,7 @@ protected static ConfigDef configDef() {
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE,
CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
BIGINT_UNSIGNED_HANDLING_MODE);
BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS);
return config;
}

Expand Down
Expand Up @@ -165,11 +165,16 @@ public synchronized void start(Configuration config) {
// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();

if (!taskContext.snapshotDelay().isZero()) {
// Adding a timed blocking reader to delay the snapshot, can help to avoid initial rebalancing interruptions
chainedReaderBuilder.addReader(new TimedBlockingReader("timed-blocker", taskContext.snapshotDelay()));
}
chainedReaderBuilder.addReader(snapshotReader);

if (taskContext.isInitialSnapshotOnly()) {
logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
chainedReaderBuilder.addReader(new BlockingReader("blocker"));
chainedReaderBuilder.addReader(new BlockingReader("blocker", "Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
chainedReaderBuilder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
} else {
if (!rowBinlogEnabled) {
Expand Down
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;

import java.time.Duration;
import java.util.Map;
import java.util.function.Predicate;

Expand Down Expand Up @@ -249,6 +250,10 @@ public String getHeartbeatTopicsPrefix() {
return config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
}

public Duration snapshotDelay() {
return Duration.ofMillis(config.getLong(MySqlConnectorConfig.SNAPSHOT_DELAY_MS));
}

public void start() {
connectionContext.start();
// Start the MySQL database history, which simply starts up resources but does not recover the history to a specific point
Expand Down
@@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;

import java.time.Duration;
import java.util.List;

import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.util.Clock;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;

/**
* A component that blocks doing nothing for a specified period of time or until the connector task is stopped
*
* @author Peter Goransson
*/
public class TimedBlockingReader extends BlockingReader {

protected final Logger logger = LoggerFactory.getLogger(getClass());

private final Duration timeout;
private Timer timer;

/**
* @param name Name of the reader
* @param timeout Duration of time until this TimedBlockingReader should stop
*/
public TimedBlockingReader(String name, Duration timeout) {
super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding");
this.timeout = timeout;
}

@Override
public void start() {
super.start();
this.timer = Threads.timer(Clock.SYSTEM, timeout);
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
super.poll();

// Stop when we've reached the timeout threshold
if (timer != null && timer.expired()) {
stop();
}

return null;
}


}