Skip to content

Commit

Permalink
DBZ-5677: Apply fix from DBZ-5371 for MongoDB Snapshot Source to Stre…
Browse files Browse the repository at this point in the history
…aming Source
  • Loading branch information
Sage-Pierce committed Oct 5, 2022
1 parent 9fb0ce7 commit 5b7687c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.MongoQueryException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
Expand Down Expand Up @@ -256,7 +254,7 @@ private boolean isSnapshotExpected(MongoPrimary primaryClient, ReplicaSetOffsetC
// There is no ongoing snapshot, so look to see if our last recorded offset still exists in the oplog.
BsonTimestamp lastRecordedTs = offsetContext.lastOffsetTimestamp();
BsonTimestamp firstAvailableTs = primaryClient.execute("get oplog position", primary -> {
return SourceInfo.extractEventTimestamp(getOplogEntry(primary, 1));
return SourceInfo.extractEventTimestamp(MongoUtil.getOplogEntry(primary, 1, LOGGER));
});

if (firstAvailableTs == null) {
Expand Down Expand Up @@ -293,7 +291,7 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
if (primaryClient != null) {
try {
primaryClient.execute("get oplog position", primary -> {
positions.put(replicaSet, getOplogEntry(primary, -1));
positions.put(replicaSet, MongoUtil.getOplogEntry(primary, -1, LOGGER));
});
}
finally {
Expand All @@ -307,22 +305,6 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
new MongoDbIncrementalSnapshotContext<>(false), positions);
}

private Document getOplogEntry(MongoClient primary, int sortOrder) throws MongoQueryException {
try {
MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs", Document.class);
return oplog.find().sort(new Document("$natural", sortOrder)).limit(1).first();
}
catch (MongoQueryException e) {
if (e.getMessage().contains("$natural:") && e.getMessage().contains("is not supported")) {
final String sortOrderType = sortOrder == -1 ? "descending" : "ascending";
// Amazon DocumentDB does not support $natural, assume no oplog entries when this occurs
LOGGER.info("Natural {} sort is not supported on oplog, treating situation as no oplog entry exists.", sortOrderType);
return null;
}
throw e;
}
}

private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, ReplicaSet replicaSet,
MongoPrimary primaryClient)
throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,7 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
if (primaryClient != null) {
try {
primaryClient.execute("get oplog position", primary -> {
MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs");
Document last = oplog.find().sort(new Document("$natural", -1)).limit(1).first(); // may be null
positions.put(replicaSet, last);
positions.put(replicaSet, MongoUtil.getOplogEntry(primary, -1, LOGGER));
});
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import org.bson.Document;
import org.bson.types.Binary;
import org.slf4j.Logger;

import com.mongodb.MongoQueryException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
Expand Down Expand Up @@ -227,6 +229,22 @@ public static ServerAddress parseAddress(String addressStr) {
return null;
}

public static Document getOplogEntry(MongoClient primary, int sortOrder, Logger logger) throws MongoQueryException {
try {
MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs", Document.class);
return oplog.find().sort(new Document("$natural", sortOrder)).limit(1).first();
}
catch (MongoQueryException e) {
if (e.getMessage().contains("$natural:") && e.getMessage().contains("is not supported")) {
final String sortOrderType = sortOrder == -1 ? "descending" : "ascending";
// Amazon DocumentDB does not support $natural, assume no oplog entries when this occurs
logger.info("Natural {} sort is not supported on oplog, treating situation as no oplog entry exists.", sortOrderType);
return null;
}
throw e;
}
}

/**
* Helper function to extract the session transaction-id from an oplog event.
*
Expand Down

0 comments on commit 5b7687c

Please sign in to comment.