From 75267eddab0fc239dc682f4e3ae684335a9ad6fb Mon Sep 17 00:00:00 2001 From: hpgrahsl Date: Fri, 19 Jul 2019 10:17:46 +0200 Subject: [PATCH] preview feature to configure no op support based on CDC event type basic unit test available but config tests missing --- config/MongoDbSinkConnector.properties | 1 + .../mongodb/MongoDbSinkConnectorConfig.java | 80 ++++++++++++++++-- .../cdc/debezium/mongodb/MongoDbHandler.java | 2 +- .../cdc/debezium/mongodb/MongoDbNoOp.java | 15 ++++ .../cdc/debezium/rdbms/RdbmsHandler.java | 2 +- .../mongodb/cdc/debezium/rdbms/RdbmsNoOp.java | 15 ++++ .../debezium/mongodb/MongoDbHandlerTest.java | 81 +++++++++++++++++++ .../cdc/debezium/mongodb/MongoDbNoOpTest.java | 24 ++++++ .../cdc/debezium/rdbms/RdbmsHandlerTest.java | 79 ++++++++++++++++++ .../cdc/debezium/rdbms/RdbmsNoOpTest.java | 24 ++++++ 10 files changed, 316 insertions(+), 7 deletions(-) create mode 100644 src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbNoOp.java create mode 100644 src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsNoOp.java create mode 100644 src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbNoOpTest.java create mode 100644 src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsNoOpTest.java diff --git a/config/MongoDbSinkConnector.properties b/config/MongoDbSinkConnector.properties index 1329599..ca9c416 100644 --- a/config/MongoDbSinkConnector.properties +++ b/config/MongoDbSinkConnector.properties @@ -41,6 +41,7 @@ mongodb.field.renamer.mapping=[] mongodb.field.renamer.regexp=[] mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder mongodb.change.data.capture.handler= +mongodb.change.data.capture.handler.operations=c,r,u,d mongodb.delete.on.null.values=false mongodb.writemodel.strategy=at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy mongodb.max.batch.size=0 diff --git a/src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java b/src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java index 1b4fc3c..076d713 100644 --- a/src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java +++ b/src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java @@ -17,8 +17,10 @@ package at.grahsl.kafka.connect.mongodb; import at.grahsl.kafka.connect.mongodb.cdc.CdcHandler; -import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler; -import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler; +import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation; +import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType; +import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.*; +import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.*; import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.mysql.MysqlHandler; import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.postgres.PostgresHandler; import at.grahsl.kafka.connect.mongodb.processor.*; @@ -78,6 +80,7 @@ public enum FieldProjectionTypes { public static final String MONGODB_FIELD_RENAMER_REGEXP_DEFAULT = "[]"; public static final String MONGODB_POST_PROCESSOR_CHAIN_DEFAULT = "at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder"; public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT = ""; + public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DEFAULT = "c,r,u,d"; public static final boolean MONGODB_DELETE_ON_NULL_VALUES_DEFAULT = false; public static final String MONGODB_WRITEMODEL_STRATEGY_DEFAULT = "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy"; public static final int MONGODB_MAX_BATCH_SIZE_DEFAULT = 0; @@ -129,6 +132,9 @@ public enum FieldProjectionTypes { public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER = "mongodb.change.data.capture.handler"; private static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC = "class name of CDC handler to use for processing"; + public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS = "mongodb.change.data.capture.handler.operations"; + private static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DOC = "comma separated list of supported CDC operation types (missing ones result in no ops)"; + public static final String MONGODB_DELETE_ON_NULL_VALUES = "mongodb.delete.on.null.values"; private static final String MONGODB_DELETE_ON_NULL_VALUES_DOC = "whether or not the connector tries to delete documents based on key when value is null"; @@ -257,6 +263,7 @@ public Map validateAll(Map props) { .define(MONGODB_FIELD_RENAMER_REGEXP, Type.STRING, MONGODB_FIELD_RENAMER_REGEXP_DEFAULT, Importance.LOW, MONGODB_FIELD_RENAMER_REGEXP_DOC) .define(MONGODB_POST_PROCESSOR_CHAIN, Type.STRING, MONGODB_POST_PROCESSOR_CHAIN_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME_LIST)), Importance.LOW, MONGODB_POST_PROCESSOR_CHAIN_DOC) .define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME)), Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC) + .define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DEFAULT, Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DOC) .define(MONGODB_DELETE_ON_NULL_VALUES, Type.BOOLEAN, MONGODB_DELETE_ON_NULL_VALUES_DEFAULT, Importance.MEDIUM, MONGODB_DELETE_ON_NULL_VALUES_DOC) .define(MONGODB_WRITEMODEL_STRATEGY, Type.STRING, MONGODB_WRITEMODEL_STRATEGY_DEFAULT, Importance.LOW, MONGODB_WRITEMODEL_STRATEGY_DOC) .define(MONGODB_MAX_BATCH_SIZE, Type.INT, MONGODB_MAX_BATCH_SIZE_DEFAULT, ConfigDef.Range.atLeast(0), Importance.MEDIUM, MONGODB_MAX_BATCH_SIZE_DOC) @@ -623,6 +630,69 @@ public CdcHandler getCdcHandler() { return getCdcHandler(""); } + public Map getCdcHandlerOperationMapping(Class cdcHandler, String collection) { + + Set supportedOperations = new HashSet<>( + splitAndTrimAndRemoveConfigListEntries(getString(MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS,collection)) + ); + + Map operationMapping = new HashMap<>(); + + if (cdcHandler.isAssignableFrom(MysqlHandler.class) + || cdcHandler.isAssignableFrom(PostgresHandler.class) + || cdcHandler.isAssignableFrom(RdbmsHandler.class)) { + + Stream.of(OperationType.values()).forEach( + ot -> operationMapping.put(ot, new RdbmsNoOp()) + ); + + supportedOperations.stream() + .map(OperationType::fromText) + .forEach(ot -> { + switch (ot) { + case CREATE: + case READ: + operationMapping.put(ot,new RdbmsInsert()); + break; + case UPDATE: + operationMapping.put(ot,new RdbmsUpdate()); + break; + case DELETE: + operationMapping.put(ot,new RdbmsDelete()); + break; + } + }); + + } else if (cdcHandler.isAssignableFrom(MongoDbHandler.class)) { + + Stream.of(OperationType.values()).forEach( + ot -> operationMapping.put(ot, new MongoDbNoOp()) + ); + + supportedOperations.stream() + .map(OperationType::fromText) + .forEach(ot -> { + switch (ot) { + case CREATE: + case READ: + operationMapping.put(ot,new MongoDbInsert()); + break; + case UPDATE: + operationMapping.put(ot,new MongoDbUpdate()); + break; + case DELETE: + operationMapping.put(ot,new MongoDbDelete()); + break; + } + }); + + } else { + throw new ConfigException("error: unsupported cdc handler " + cdcHandler.getName()); + } + + return operationMapping; + } + public CdcHandler getCdcHandler(String collection) { Set predefinedCdcHandler = getPredefinedCdcHandlerClassNames(); @@ -637,9 +707,9 @@ public CdcHandler getCdcHandler(String collection) { } try { - return (CdcHandler) Class.forName(cdcHandler) - .getConstructor(MongoDbSinkConnectorConfig.class) - .newInstance(this); + Class cdcHandlerClass = (Class)Class.forName(cdcHandler); + return cdcHandlerClass.getConstructor(MongoDbSinkConnectorConfig.class,Map.class) + .newInstance(this,getCdcHandlerOperationMapping(cdcHandlerClass,collection)); } catch (ReflectiveOperationException e) { throw new ConfigException(e.getMessage(),e); } catch (ClassCastException e) { diff --git a/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandler.java b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandler.java index b242fa9..0fc2339 100644 --- a/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandler.java +++ b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandler.java @@ -72,7 +72,7 @@ public Optional> handle(SinkDocument doc) { logger.debug("key: "+keyDoc.toString()); logger.debug("value: "+valueDoc.toString()); - return Optional.of(getCdcOperation(valueDoc).perform(doc)); + return Optional.ofNullable(getCdcOperation(valueDoc).perform(doc)); } } diff --git a/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbNoOp.java b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbNoOp.java new file mode 100644 index 0000000..6a07065 --- /dev/null +++ b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbNoOp.java @@ -0,0 +1,15 @@ +package at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb; + +import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation; +import at.grahsl.kafka.connect.mongodb.converter.SinkDocument; +import com.mongodb.client.model.WriteModel; +import org.bson.BsonDocument; + +public class MongoDbNoOp implements CdcOperation { + + @Override + public WriteModel perform(SinkDocument doc) { + return null; + } + +} diff --git a/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandler.java b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandler.java index 96d3bb3..082b9a6 100644 --- a/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandler.java +++ b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandler.java @@ -69,7 +69,7 @@ public Optional> handle(SinkDocument doc) { return Optional.empty(); } - return Optional.of(getCdcOperation(valueDoc) + return Optional.ofNullable(getCdcOperation(valueDoc) .perform(new SinkDocument(keyDoc,valueDoc))); } diff --git a/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsNoOp.java b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsNoOp.java new file mode 100644 index 0000000..fef1286 --- /dev/null +++ b/src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsNoOp.java @@ -0,0 +1,15 @@ +package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms; + +import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation; +import at.grahsl.kafka.connect.mongodb.converter.SinkDocument; +import com.mongodb.client.model.WriteModel; +import org.bson.BsonDocument; + +public class RdbmsNoOp implements CdcOperation { + + @Override + public WriteModel perform(SinkDocument doc) { + return null; + } + +} diff --git a/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandlerTest.java b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandlerTest.java index 36f34a8..70735da 100644 --- a/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandlerTest.java +++ b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandlerTest.java @@ -1,7 +1,9 @@ package at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb; import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig; +import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation; import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType; +import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler; import at.grahsl.kafka.connect.mongodb.converter.SinkDocument; import com.mongodb.client.model.DeleteOneModel; import com.mongodb.client.model.ReplaceOneModel; @@ -36,6 +38,15 @@ public class MongoDbHandlerTest { new MongoDbHandler(new MongoDbSinkConnectorConfig(new HashMap<>()), new HashMap<>()); + public static final RdbmsHandler MONGODB_HANDLER_NOOP_MAPPING = + new RdbmsHandler(new MongoDbSinkConnectorConfig(new HashMap<>()), + new HashMap() {{ + put(OperationType.CREATE,new MongoDbNoOp()); + put(OperationType.READ,new MongoDbNoOp()); + put(OperationType.UPDATE,new MongoDbNoOp()); + put(OperationType.DELETE,new MongoDbNoOp()); + }}); + @Test @DisplayName("verify existing default config from base class") public void testExistingDefaultConfig() { @@ -209,4 +220,74 @@ public Stream testValidCdcOpertionTypes() { } + @TestFactory + @DisplayName("when valid cdc operation type mapped to NO OP then CdcOperation of type MongoDbNoOp") + public Stream testValidCdcOpertionWithNoOpMappings() { + + return Stream.of(OperationType.values()).map(ot -> + dynamicTest("test operation " + ot, () -> + assertTrue(MONGODB_HANDLER_NOOP_MAPPING.getCdcOperation( + new BsonDocument("op", new BsonString("c"))) + instanceof MongoDbNoOp) + ) + ); + + } + + @TestFactory + @DisplayName("when valid CDC event with noop mapping then empty WriteModel") + public Stream testValidCdcDocumentWithNoOpMapping() { + + return Stream.of( + dynamicTest("test operation "+OperationType.CREATE, + () -> assertEquals(Optional.empty(), + MONGODB_HANDLER_NOOP_MAPPING.handle( + new SinkDocument( + new BsonDocument("_id",new BsonString("1234")), + new BsonDocument("op",new BsonString("c")) + .append("after",new BsonString("{_id:1234,foo:\"blah\"}")) + ) + ), + () -> "result of MongoDbNoOp must be Optional.empty()") + + ), + dynamicTest("test operation "+OperationType.READ, + () -> assertEquals(Optional.empty(), + MONGODB_HANDLER_NOOP_MAPPING.handle( + new SinkDocument( + new BsonDocument("_id",new BsonString("1234")), + new BsonDocument("op",new BsonString("r")) + .append("after",new BsonString("{_id:1234,foo:\"blah\"}")) + ) + ), + () -> "result of MongoDbNoOp must be Optional.empty()") + + ), + dynamicTest("test operation "+OperationType.UPDATE, + () -> assertEquals(Optional.empty(), + MONGODB_HANDLER_NOOP_MAPPING.handle( + new SinkDocument( + new BsonDocument("id",new BsonString("1234")), + new BsonDocument("op",new BsonString("u")) + .append("patch",new BsonString("{\"$set\":{foo:\"blah\"}}")) + ) + ), + () -> "result of MongoDbNoOp must be Optional.empty()") + + ), + dynamicTest("test operation "+OperationType.DELETE, + () -> assertEquals(Optional.empty(), + MONGODB_HANDLER_NOOP_MAPPING.handle( + new SinkDocument( + new BsonDocument("id",new BsonString("1234")), + new BsonDocument("op",new BsonString("d")) + ) + ), + () -> "result of MongoDbNoOp must be Optional.empty()") + + ) + ); + + } + } diff --git a/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbNoOpTest.java b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbNoOpTest.java new file mode 100644 index 0000000..9de1be4 --- /dev/null +++ b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbNoOpTest.java @@ -0,0 +1,24 @@ +package at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb; + +import at.grahsl.kafka.connect.mongodb.converter.SinkDocument; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertNull; + +@RunWith(JUnitPlatform.class) +public class MongoDbNoOpTest { + + @Test + @DisplayName("when any cdc event then WriteModel is null resulting in Optional.empty() in the corresponding handler") + public void testValidSinkDocument() { + assertAll("test behaviour of MongoDbNoOp", + () -> assertNull(new MongoDbNoOp().perform(new SinkDocument(null,null)),"MongoDbNoOp must result in null WriteModel"), + () -> assertNull(new MongoDbNoOp().perform(null),"MongoDbNoOp must result in null WriteModel") + ); + } + +} diff --git a/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandlerTest.java b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandlerTest.java index d8b5e23..4202871 100644 --- a/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandlerTest.java +++ b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandlerTest.java @@ -1,6 +1,7 @@ package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms; import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig; +import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation; import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType; import at.grahsl.kafka.connect.mongodb.converter.SinkDocument; import com.mongodb.client.model.DeleteOneModel; @@ -35,6 +36,15 @@ public class RdbmsHandlerTest { new RdbmsHandler(new MongoDbSinkConnectorConfig(new HashMap<>()), new HashMap<>()); + public static final RdbmsHandler RDBMS_HANDLER_NOOP_MAPPING = + new RdbmsHandler(new MongoDbSinkConnectorConfig(new HashMap<>()), + new HashMap() {{ + put(OperationType.CREATE,new RdbmsNoOp()); + put(OperationType.READ,new RdbmsNoOp()); + put(OperationType.UPDATE,new RdbmsNoOp()); + put(OperationType.DELETE,new RdbmsNoOp()); + }}); + @Test @DisplayName("verify existing default config from base class") public void testExistingDefaultConfig() { @@ -211,4 +221,73 @@ public Stream testValidCdcOpertionTypes() { } + @TestFactory + @DisplayName("when valid cdc operation type mapped to NO OP then CdcOperation of type RdbmsNoOp") + public Stream testValidCdcOpertionWithNoOpMappings() { + + return Stream.of(OperationType.values()).map(ot -> + dynamicTest("test operation " + ot, () -> + assertTrue(RDBMS_HANDLER_NOOP_MAPPING.getCdcOperation( + new BsonDocument("op", new BsonString("c"))) + instanceof RdbmsNoOp) + ) + ); + + } + + @TestFactory + @DisplayName("when valid CDC event with noop mapping then empty WriteModel") + public Stream testValidCdcDocumentWithNoOpMapping() { + + return Stream.of( + dynamicTest("test operation "+OperationType.CREATE, + () -> assertEquals(Optional.empty(), + RDBMS_HANDLER_NOOP_MAPPING.handle( + new SinkDocument( + new BsonDocument("id",new BsonInt32(1004)), + new BsonDocument("op",new BsonString("c")) + .append("after",new BsonDocument("id",new BsonInt32(1004)) + .append("foo",new BsonString("blah"))) + ) + ), + () -> "result of RdbmsNoOp must be Optional.empty()") + ), + dynamicTest("test operation "+OperationType.READ, + () -> assertEquals(Optional.empty(), + RDBMS_HANDLER_NOOP_MAPPING.handle( + new SinkDocument( + new BsonDocument("id",new BsonInt32(1004)), + new BsonDocument("op",new BsonString("r")) + .append("after",new BsonDocument("id",new BsonInt32(1004)) + .append("foo",new BsonString("blah"))) + ) + ), + () -> "result of RdbmsNoOp must be Optional.empty()") + ), + dynamicTest("test operation "+OperationType.UPDATE, + () -> assertEquals(Optional.empty(), + RDBMS_HANDLER_NOOP_MAPPING.handle( + new SinkDocument( + new BsonDocument("id",new BsonInt32(1004)), + new BsonDocument("op",new BsonString("u")) + .append("after",new BsonDocument("id",new BsonInt32(1004)) + .append("foo",new BsonString("blah"))) + ) + ), + () -> "result of RdbmsNoOp must be Optional.empty()") + ), + dynamicTest("test operation "+OperationType.DELETE, + () -> assertEquals(Optional.empty(), + RDBMS_HANDLER_NOOP_MAPPING.handle( + new SinkDocument( + new BsonDocument("id",new BsonInt32(1004)), + new BsonDocument("op",new BsonString("d")) + ) + ), + () -> "result of RdbmsNoOp must be Optional.empty()") + ) + ); + + } + } diff --git a/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsNoOpTest.java b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsNoOpTest.java new file mode 100644 index 0000000..503ef33 --- /dev/null +++ b/src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsNoOpTest.java @@ -0,0 +1,24 @@ +package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms; + +import at.grahsl.kafka.connect.mongodb.converter.SinkDocument; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertNull; + +@RunWith(JUnitPlatform.class) +public class RdbmsNoOpTest { + + @Test + @DisplayName("when any cdc event then WriteModel is null resulting in Optional.empty() in the corresponding handler") + public void testValidSinkDocument() { + assertAll("test behaviour of MongoDbNoOp", + () -> assertNull(new RdbmsNoOp().perform(new SinkDocument(null,null)),"RdbmsNoOp must result in null WriteModel"), + () -> assertNull(new RdbmsNoOp().perform(null),"RdbmsNoOp must result in null WriteModel") + ); + } + +}