New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-5677: Apply fix from DBZ-5371 for MongoDB Snapshot Source to Streaming Source #3939
Conversation
Hi @Sage-Pierce. Thank you for your valuable contribution. |
Welcome as a new contributor to Debezium, @Sage-Pierce. Reviewers, please add missing author name(s) and alias name(s) to the COPYRIGHT.txt and Aliases.txt respectively. |
@@ -320,7 +320,7 @@ private void readChangeStream(MongoClient primary, MongoPrimary primaryClient, R | |||
doc.put("_data", new BsonString(rsOffsetContext.lastResumeToken())); | |||
rsChangeStream.resumeAfter(doc); | |||
} | |||
else { | |||
else if (oplogStart.getTime() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my local testing, I needed this check in order to avoid starting at a timestamp that the Connector otherwise thinks is no longer in the oplog, and throws the following exception:
2022-10-05 16:12:01.154 [ERROR] 19806 --- [calpkmappings-0] .apache.kafka.connect.runtime.WorkerTask : WorkerSourceTask{id=employer-job-mapping-documentdb-canonicalpkmappings-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:134)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:103)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:59)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rs0/employer-job-search-documentdb-qa-0.cbue06mb88c1.us-east-2.docdb.amazonaws.com:27017,employer-job-search-documentdb-qa-1.cbue06mb88c1.us-east-2.docdb.amazonaws.com:27017,employer-job-search-documentdb-qa-2.cbue06mb88c1.us-east-2.docdb.amazonaws.com:27017'
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:177)
at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:292)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:122)
... 11 more
Caused by: com.mongodb.MongoCommandException: Command failed with error 136: 'CappedPositionLost: CollectionScan died due to position in capped collection being deleted.' on server employer-job-search-documentdb-qa-1.cbue06mb88c1.us-east-2.docdb.amazonaws.com:27017. The full response is {"ok": 0.0, "operationTime": {"$timestamp": {"t": 1665004320, "i": 1}}, "code": 136, "errmsg": "CappedPositionLost: CollectionScan died due to position in capped collection being deleted."}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:195)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:400)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:324)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:603)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:81)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:252)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:214)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:113)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:328)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:318)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:201)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeCommand$4(CommandOperationHelper.java:189)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:189)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:323)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:319)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:319)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:184)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:204)
at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:158)
at com.mongodb.client.internal.ChangeStreamIterableImpl.iterator(ChangeStreamIterableImpl.java:153)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:332)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:124)
at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:288)
... 12 more
This should have the effect of the change stream starting at the tail when there is neither a resume token or operation time to start from
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the error message is a bit misleading in indicating a "capped" collection, which none of mine are.
After adding this check, everything seems to work as expected with DocumentDB
FYI @Naros