Skip to content

Commit

Permalink
DBZ-7272 Guarding against implicit offset invalidation cause by the c…
Browse files Browse the repository at this point in the history
…hange of default connection mode
  • Loading branch information
jcechace committed Dec 18, 2023
1 parent e3bb119 commit d206389
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,14 @@ public static OversizeHandlingMode parse(String value, String defaultValue) {
.withDescription("Internal use only")
.withType(Type.LIST);

/**
* The {@link ReplicaSets#SEPARATOR}-separated list of connection strings
*/
public static final Field ALLOW_OFFSET_INVALIDATION = Field.createInternal("mongodb.internal.allow.offset.invalidation")
.withDescription("Allows offset invalidation when required by change of connection mode")
.withDefault(false)
.withType(Type.BOOLEAN);

// MongoDb fields in Connection Group start from 1 (topic.prefix is 0)
public static final Field CONNECTION_STRING = Field.create("mongodb.connection.string")
.withDisplayName("Connection String")
Expand Down Expand Up @@ -931,6 +939,7 @@ public static OversizeHandlingMode parse(String value, String defaultValue) {
TOPIC_PREFIX,
CONNECTION_STRING,
CONNECTION_MODE,
ALLOW_OFFSET_INVALIDATION,
USER,
PASSWORD,
AUTH_SOURCE,
Expand Down Expand Up @@ -973,6 +982,7 @@ public static ConfigDef configDef() {
private final CaptureScope captureScope;
private final String captureTarget;
private final ConnectionMode connectionMode;
private final boolean offsetInvalidationAllowed;
private final int snapshotMaxThreads;
private final int cursorMaxAwaitTimeMs;
private final ReplicaSets replicaSets;
Expand All @@ -993,6 +1003,7 @@ public MongoDbConnectorConfig(Configuration config) {
String connectionModeValue = config.getString(MongoDbConnectorConfig.CONNECTION_MODE);
this.connectionMode = ConnectionMode.parse(connectionModeValue, MongoDbConnectorConfig.CONNECTION_MODE.defaultValueAsString());
this.shardConnectionParameters = config.getString(SHARD_CONNECTION_PARAMS);
this.offsetInvalidationAllowed = config.getBoolean(ALLOW_OFFSET_INVALIDATION);

String captureScopeValue = config.getString(MongoDbConnectorConfig.CAPTURE_SCOPE);
this.captureScope = CaptureScope.parse(captureScopeValue, MongoDbConnectorConfig.CAPTURE_SCOPE.defaultValueAsString());
Expand Down Expand Up @@ -1192,6 +1203,10 @@ public ConnectionMode getConnectionMode() {
return connectionMode;
}

public boolean isOffsetInvalidationAllowed() {
return offsetInvalidationAllowed;
}

public String getShardConnectionParameters() {
return shardConnectionParameters;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -24,6 +25,7 @@
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.metrics.MongoDbChangeEventSourceMetricsFactory;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
Expand Down Expand Up @@ -196,10 +198,49 @@ private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorC
return offsetContext;
}
else {
checkShardSpecificOffsetsIfNeeded(connectorConfig, replicaSets);
return null;
}
}

private void checkShardSpecificOffsetsIfNeeded(MongoDbConnectorConfig connectorConfig, ReplicaSets currentReplicaSets) {
if (currentReplicaSets.size() != 1 || !currentReplicaSets.getSnapshotReplicaSet().isClusterRs()) {
// We are not running in sharded connection mode, so no check is needed
return;
}

logger.info("Previous offset not found, checking shard specific offsets from replica_set connection mode.");
var discovery = new ReplicaSetDiscovery(taskContext);
var replicaSetSpecs = new HashSet<ReplicaSet>();

try (var client = taskContext.getConnectionContext().connect()) {
discovery.readReplicaSetsFromShardedCluster(replicaSetSpecs, client);
}
catch (Throwable t) {
logger.warn("Unable to read shard topology.");
return;
}

var replicaSets = new ReplicaSets(replicaSetSpecs);
var loader = new MongoDbOffsetContext.Loader(connectorConfig, replicaSets);
Collection<Map<String, String>> partitions = loader.getPartitions();
Map<Map<String, String>, Map<String, Object>> offsets = context.offsetStorageReader().offsets(partitions);

if (offsets != null && offsets.values().stream().anyMatch(Objects::nonNull)) {
logger.warn("Found at least one shard specific offset from previous run");
if (connectorConfig.isOffsetInvalidationAllowed()) {
logger.warn("Offset invalidation is allowed, previous shard specific offsets will be ignored");
return;
}

throw new DebeziumException("Found at least one shard specific offset from previous run." +
"The default connection mode for sharded has changed to 'sharded' and previous offsets would be invalidated." +
"Either explicitly set 'mongodb.connection.mode=replica_set' to postpone the migration " +
"or set 'mongodb.internal.allow.offset.invalidation=true'. " +
"In next release the 'replica_set' connection mode will be removed.");
}
}

private ReplicaSets getReplicaSets(MongoDbConnectorConfig connectorConfig) {
final ReplicaSets replicaSets = connectorConfig.getReplicaSets();
if (replicaSets.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public ReplicaSets getReplicaSets(MongoClient client) {
}
else if (ConnectionMode.REPLICA_SET.equals(connectionMode)) {
LOGGER.info("ConnectionMode set to '{}, individual shard connections will be used", connectionMode.getValue());
readReplicaSetsFromShardedCluster(replicaSetSpecs, client, connectionContext);
readReplicaSetsFromShardedCluster(replicaSetSpecs, client);
}
else {
LOGGER.warn("Incompatible connection mode '{}' specified", connectionMode.getValue());
Expand Down Expand Up @@ -113,7 +113,7 @@ private void readReplicaSetsFromCluster(Set<ReplicaSet> replicaSetSpecs, Cluster
replicaSetSpecs.add(new ReplicaSet(connectionString));
}

private void readReplicaSetsFromShardedCluster(Set<ReplicaSet> replicaSetSpecs, MongoClient client, ConnectionContext connectionContext) {
public void readReplicaSetsFromShardedCluster(Set<ReplicaSet> replicaSetSpecs, MongoClient client) {
try {
var csParams = context.getConnectorConfig().getShardConnectionParameters();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class ReplicaSet implements Comparable<ReplicaSet> {
private final String replicaSetName;
private final ConnectionString connectionString;
private final int hc;
private final boolean isClusterRs;

public ReplicaSet(String connectionString) {
this(new ConnectionString(connectionString));
Expand All @@ -31,6 +32,7 @@ public ReplicaSet(ConnectionString connectionString) {
private ReplicaSet(String replicaSetName, ConnectionString connectionString) {
this.connectionString = Objects.requireNonNull(connectionString, "Connection string cannot be null");
this.replicaSetName = Objects.requireNonNullElse(replicaSetName, CLUSTER_RS_NAME);
this.isClusterRs = replicaSetName == null;
this.hc = Objects.hash(connectionString);
}

Expand All @@ -43,6 +45,10 @@ public String replicaSetName() {
return replicaSetName;
}

public boolean isClusterRs() {
return isClusterRs;
}

/**
* Get connection string
*
Expand Down

0 comments on commit d206389

Please sign in to comment.