Skip to content

Commit

Permalink
remove potentially contained internal oplog field '$v' (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpgrahsl committed Jul 13, 2019
1 parent 2f43344 commit a02817c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class MongoDbUpdate implements CdcOperation {

public static final String JSON_DOC_FIELD_PATH = "patch";
public static final String INTERNAL_OPLOG_FIELD_V = "$v";

private static final UpdateOptions UPDATE_OPTIONS =
new UpdateOptions().upsert(true);
Expand All @@ -46,6 +47,12 @@ public WriteModel<BsonDocument> perform(SinkDocument doc) {
valueDoc.getString(JSON_DOC_FIELD_PATH).getValue()
);

//Check if the internal "$v" field is contained which was added to the
//oplog format in 3.6+ If so, then we simply remove it for now since
//it's not used by the sink connector at the moment and would break
//CDC-mode based "replication".
updateDoc.remove(INTERNAL_OPLOG_FIELD_V);

//patch contains full new document for replacement
if(updateDoc.containsKey(DBCollection.ID_FIELD_NAME)) {
BsonDocument filterDoc =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public class MongoDbUpdateTest {
.append("last_name",new BsonString("Kretchmer"))
);

//USED to verify if oplog internals ($v field) are removed correctly
public static final BsonDocument UPDATE_DOC_WITH_OPLOG_INTERNALS =
UPDATE_DOC.clone().append("$v",new BsonInt32(1));

@Test
@DisplayName("when valid doc replace cdc event then correct ReplaceOneModel")
public void testValidSinkDocumentForReplacement() {
Expand Down Expand Up @@ -93,6 +97,33 @@ public void testValidSinkDocumentForUpdate() {

}

@Test
@DisplayName("when valid doc change cdc event containing internal oplog fields then correct UpdateOneModel")
public void testValidSinkDocumentWithInternalOploagFieldForUpdate() {
BsonDocument keyDoc = new BsonDocument("id",new BsonString("1004"));

BsonDocument valueDoc = new BsonDocument("op",new BsonString("u"))
.append("patch",new BsonString(UPDATE_DOC_WITH_OPLOG_INTERNALS.toJson()));

WriteModel<BsonDocument> result =
MONGODB_UPDATE.perform(new SinkDocument(keyDoc,valueDoc));

assertTrue(result instanceof UpdateOneModel,
() -> "result expected to be of type UpdateOneModel");

UpdateOneModel<BsonDocument> writeModel =
(UpdateOneModel<BsonDocument>) result;

assertEquals(UPDATE_DOC,writeModel.getUpdate(),
()-> "update doc not matching what is expected");

assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");

assertEquals(FILTER_DOC,writeModel.getFilter());

}

@Test
@DisplayName("when missing value doc then DataException")
public void testMissingValueDocument() {
Expand Down

0 comments on commit a02817c

Please sign in to comment.