Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-5677: Apply fix from DBZ-5371 for MongoDB Snapshot Source to Streaming Source #3939

Merged
merged 3 commits into from Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions COPYRIGHT.txt
Expand Up @@ -332,6 +332,7 @@ Robert Coup
Robert Roldan
Ruslan Gibaiev
Russell Ballard
Sage Pierce
Sairam Polavarapu
Sanjay Kr Singh
Sanne Grinovero
Expand Down
Expand Up @@ -26,8 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.MongoQueryException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
Expand Down Expand Up @@ -256,7 +254,7 @@ private boolean isSnapshotExpected(MongoPrimary primaryClient, ReplicaSetOffsetC
// There is no ongoing snapshot, so look to see if our last recorded offset still exists in the oplog.
BsonTimestamp lastRecordedTs = offsetContext.lastOffsetTimestamp();
BsonTimestamp firstAvailableTs = primaryClient.execute("get oplog position", primary -> {
return SourceInfo.extractEventTimestamp(getOplogEntry(primary, 1));
return SourceInfo.extractEventTimestamp(MongoUtil.getOplogEntry(primary, 1, LOGGER));
});

if (firstAvailableTs == null) {
Expand Down Expand Up @@ -293,7 +291,7 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
if (primaryClient != null) {
try {
primaryClient.execute("get oplog position", primary -> {
positions.put(replicaSet, getOplogEntry(primary, -1));
positions.put(replicaSet, MongoUtil.getOplogEntry(primary, -1, LOGGER));
});
}
finally {
Expand All @@ -307,22 +305,6 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
new MongoDbIncrementalSnapshotContext<>(false), positions);
}

private Document getOplogEntry(MongoClient primary, int sortOrder) throws MongoQueryException {
try {
MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs", Document.class);
return oplog.find().sort(new Document("$natural", sortOrder)).limit(1).first();
}
catch (MongoQueryException e) {
if (e.getMessage().contains("$natural:") && e.getMessage().contains("is not supported")) {
final String sortOrderType = sortOrder == -1 ? "descending" : "ascending";
// Amazon DocumentDB does not support $natural, assume no oplog entries when this occurs
LOGGER.info("Natural {} sort is not supported on oplog, treating situation as no oplog entry exists.", sortOrderType);
return null;
}
throw e;
}
}

private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, ReplicaSet replicaSet,
MongoPrimary primaryClient)
throws InterruptedException {
Expand Down
Expand Up @@ -320,7 +320,7 @@ private void readChangeStream(MongoClient primary, MongoPrimary primaryClient, R
doc.put("_data", new BsonString(rsOffsetContext.lastResumeToken()));
rsChangeStream.resumeAfter(doc);
}
else {
else if (oplogStart.getTime() > 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my local testing, I needed this check in order to avoid starting at a timestamp that the Connector otherwise thinks is no longer in the oplog, and throws the following exception:

2022-10-05 16:12:01.154 [ERROR] 19806 --- [calpkmappings-0] .apache.kafka.connect.runtime.WorkerTask : WorkerSourceTask{id=employer-job-mapping-documentdb-canonicalpkmappings-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
	at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:134)
	at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:103)
	at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:59)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rs0/employer-job-search-documentdb-qa-0.cbue06mb88c1.us-east-2.docdb.amazonaws.com:27017,employer-job-search-documentdb-qa-1.cbue06mb88c1.us-east-2.docdb.amazonaws.com:27017,employer-job-search-documentdb-qa-2.cbue06mb88c1.us-east-2.docdb.amazonaws.com:27017'
	at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:177)
	at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:292)
	at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:122)
	... 11 more
Caused by: com.mongodb.MongoCommandException: Command failed with error 136: 'CappedPositionLost: CollectionScan died due to position in capped collection being deleted.' on server employer-job-search-documentdb-qa-1.cbue06mb88c1.us-east-2.docdb.amazonaws.com:27017. The full response is {"ok": 0.0, "operationTime": {"$timestamp": {"t": 1665004320, "i": 1}}, "code": 136, "errmsg": "CappedPositionLost: CollectionScan died due to position in capped collection being deleted."}
	at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:195)
	at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:400)
	at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:324)
	at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
	at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:603)
	at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:81)
	at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:252)
	at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:214)
	at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
	at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:113)
	at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:328)
	at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:318)
	at com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:201)
	at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeCommand$4(CommandOperationHelper.java:189)
	at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
	at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:189)
	at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
	at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:323)
	at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:319)
	at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
	at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:319)
	at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:184)
	at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:204)
	at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:158)
	at com.mongodb.client.internal.ChangeStreamIterableImpl.iterator(ChangeStreamIterableImpl.java:153)
	at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:332)
	at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:124)
	at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:288)
	... 12 more

This should have the effect of the change stream starting at the tail when there is neither a resume token or operation time to start from

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the error message is a bit misleading in indicating a "capped" collection, which none of mine are.

After adding this check, everything seems to work as expected with DocumentDB

LOGGER.info("Resume token not available, starting streaming from time '{}'", oplogStart);
rsChangeStream.startAtOperationTime(oplogStart);
}
Expand Down Expand Up @@ -578,9 +578,7 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
if (primaryClient != null) {
try {
primaryClient.execute("get oplog position", primary -> {
MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs");
Document last = oplog.find().sort(new Document("$natural", -1)).limit(1).first(); // may be null
positions.put(replicaSet, last);
positions.put(replicaSet, MongoUtil.getOplogEntry(primary, -1, LOGGER));
});
}
finally {
Expand Down
Expand Up @@ -17,7 +17,9 @@

import org.bson.Document;
import org.bson.types.Binary;
import org.slf4j.Logger;

import com.mongodb.MongoQueryException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
Expand Down Expand Up @@ -227,6 +229,22 @@ public static ServerAddress parseAddress(String addressStr) {
return null;
}

public static Document getOplogEntry(MongoClient primary, int sortOrder, Logger logger) throws MongoQueryException {
try {
MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs", Document.class);
return oplog.find().sort(new Document("$natural", sortOrder)).limit(1).first();
}
catch (MongoQueryException e) {
if (e.getMessage().contains("$natural:") && e.getMessage().contains("is not supported")) {
final String sortOrderType = sortOrder == -1 ? "descending" : "ascending";
// Amazon DocumentDB does not support $natural, assume no oplog entries when this occurs
logger.info("Natural {} sort is not supported on oplog, treating situation as no oplog entry exists.", sortOrderType);
return null;
}
throw e;
}
}

/**
* Helper function to extract the session transaction-id from an oplog event.
*
Expand Down
1 change: 1 addition & 0 deletions jenkins-jobs/scripts/config/Aliases.txt
Expand Up @@ -160,3 +160,4 @@ nicholas-fwang,Inki Hwang
gmouss,Moustapha Mahfoud
avis408,Avinash Vishwakarma
nirolevy,Niro Levy
Sage-Pierce,Sage Pierce