From a02817cd19407d59aefb75744d36119873551b76 Mon Sep 17 00:00:00 2001 From: Hans-Peter Grahsl Date: Sat, 13 Jul 2019 15:43:38 +0200 Subject: [PATCH] remove potentially contained internal oplog field '$v' (#92) --- .../cdc/debezium/mongodb/MongoDbUpdate.java | 7 +++++ .../debezium/mongodb/MongoDbUpdateTest.java | 31 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbUpdate.java b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbUpdate.java index 3d58e22..5e2f32c 100644 --- a/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbUpdate.java +++ b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbUpdate.java @@ -29,6 +29,7 @@ public class MongoDbUpdate implements CdcOperation { public static final String JSON_DOC_FIELD_PATH = "patch"; + public static final String INTERNAL_OPLOG_FIELD_V = "$v"; private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true); @@ -46,6 +47,12 @@ public WriteModel perform(SinkDocument doc) { valueDoc.getString(JSON_DOC_FIELD_PATH).getValue() ); + //Check if the internal "$v" field is contained which was added to the + //oplog format in 3.6+ If so, then we simply remove it for now since + //it's not used by the sink connector at the moment and would break + //CDC-mode based "replication". + updateDoc.remove(INTERNAL_OPLOG_FIELD_V); + //patch contains full new document for replacement if(updateDoc.containsKey(DBCollection.ID_FIELD_NAME)) { BsonDocument filterDoc = diff --git a/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbUpdateTest.java b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbUpdateTest.java index 6d4a1b7..92e94f5 100644 --- a/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbUpdateTest.java +++ b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbUpdateTest.java @@ -35,6 +35,10 @@ public class MongoDbUpdateTest { .append("last_name",new BsonString("Kretchmer")) ); + //USED to verify if oplog internals ($v field) are removed correctly + public static final BsonDocument UPDATE_DOC_WITH_OPLOG_INTERNALS = + UPDATE_DOC.clone().append("$v",new BsonInt32(1)); + @Test @DisplayName("when valid doc replace cdc event then correct ReplaceOneModel") public void testValidSinkDocumentForReplacement() { @@ -93,6 +97,33 @@ public void testValidSinkDocumentForUpdate() { } + @Test + @DisplayName("when valid doc change cdc event containing internal oplog fields then correct UpdateOneModel") + public void testValidSinkDocumentWithInternalOploagFieldForUpdate() { + BsonDocument keyDoc = new BsonDocument("id",new BsonString("1004")); + + BsonDocument valueDoc = new BsonDocument("op",new BsonString("u")) + .append("patch",new BsonString(UPDATE_DOC_WITH_OPLOG_INTERNALS.toJson())); + + WriteModel result = + MONGODB_UPDATE.perform(new SinkDocument(keyDoc,valueDoc)); + + assertTrue(result instanceof UpdateOneModel, + () -> "result expected to be of type UpdateOneModel"); + + UpdateOneModel writeModel = + (UpdateOneModel) result; + + assertEquals(UPDATE_DOC,writeModel.getUpdate(), + ()-> "update doc not matching what is expected"); + + assertTrue(writeModel.getFilter() instanceof BsonDocument, + () -> "filter expected to be of type BsonDocument"); + + assertEquals(FILTER_DOC,writeModel.getFilter()); + + } + @Test @DisplayName("when missing value doc then DataException") public void testMissingValueDocument() {