Skip to content

Commit

Permalink
preview feature to configure no op support based on CDC event type
Browse files Browse the repository at this point in the history
basic unit test available but config tests missing
  • Loading branch information
hpgrahsl committed Jul 19, 2019
1 parent a02817c commit 75267ed
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 7 deletions.
1 change: 1 addition & 0 deletions config/MongoDbSinkConnector.properties
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mongodb.field.renamer.mapping=[]
mongodb.field.renamer.regexp=[]
mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder
mongodb.change.data.capture.handler=
mongodb.change.data.capture.handler.operations=c,r,u,d
mongodb.delete.on.null.values=false
mongodb.writemodel.strategy=at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy
mongodb.max.batch.size=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package at.grahsl.kafka.connect.mongodb;

import at.grahsl.kafka.connect.mongodb.cdc.CdcHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.*;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.*;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.mysql.MysqlHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.postgres.PostgresHandler;
import at.grahsl.kafka.connect.mongodb.processor.*;
Expand Down Expand Up @@ -78,6 +80,7 @@ public enum FieldProjectionTypes {
public static final String MONGODB_FIELD_RENAMER_REGEXP_DEFAULT = "[]";
public static final String MONGODB_POST_PROCESSOR_CHAIN_DEFAULT = "at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder";
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT = "";
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DEFAULT = "c,r,u,d";
public static final boolean MONGODB_DELETE_ON_NULL_VALUES_DEFAULT = false;
public static final String MONGODB_WRITEMODEL_STRATEGY_DEFAULT = "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy";
public static final int MONGODB_MAX_BATCH_SIZE_DEFAULT = 0;
Expand Down Expand Up @@ -129,6 +132,9 @@ public enum FieldProjectionTypes {
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER = "mongodb.change.data.capture.handler";
private static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC = "class name of CDC handler to use for processing";

public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS = "mongodb.change.data.capture.handler.operations";
private static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DOC = "comma separated list of supported CDC operation types (missing ones result in no ops)";

public static final String MONGODB_DELETE_ON_NULL_VALUES = "mongodb.delete.on.null.values";
private static final String MONGODB_DELETE_ON_NULL_VALUES_DOC = "whether or not the connector tries to delete documents based on key when value is null";

Expand Down Expand Up @@ -257,6 +263,7 @@ public Map<String, ConfigValue> validateAll(Map<String, String> props) {
.define(MONGODB_FIELD_RENAMER_REGEXP, Type.STRING, MONGODB_FIELD_RENAMER_REGEXP_DEFAULT, Importance.LOW, MONGODB_FIELD_RENAMER_REGEXP_DOC)
.define(MONGODB_POST_PROCESSOR_CHAIN, Type.STRING, MONGODB_POST_PROCESSOR_CHAIN_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME_LIST)), Importance.LOW, MONGODB_POST_PROCESSOR_CHAIN_DOC)
.define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME)), Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC)
.define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DEFAULT, Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DOC)
.define(MONGODB_DELETE_ON_NULL_VALUES, Type.BOOLEAN, MONGODB_DELETE_ON_NULL_VALUES_DEFAULT, Importance.MEDIUM, MONGODB_DELETE_ON_NULL_VALUES_DOC)
.define(MONGODB_WRITEMODEL_STRATEGY, Type.STRING, MONGODB_WRITEMODEL_STRATEGY_DEFAULT, Importance.LOW, MONGODB_WRITEMODEL_STRATEGY_DOC)
.define(MONGODB_MAX_BATCH_SIZE, Type.INT, MONGODB_MAX_BATCH_SIZE_DEFAULT, ConfigDef.Range.atLeast(0), Importance.MEDIUM, MONGODB_MAX_BATCH_SIZE_DOC)
Expand Down Expand Up @@ -623,6 +630,69 @@ public CdcHandler getCdcHandler() {
return getCdcHandler("");
}

public Map<OperationType, CdcOperation> getCdcHandlerOperationMapping(Class<? extends CdcHandler> cdcHandler, String collection) {

Set<String> supportedOperations = new HashSet<>(
splitAndTrimAndRemoveConfigListEntries(getString(MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS,collection))
);

Map<OperationType, CdcOperation> operationMapping = new HashMap<>();

if (cdcHandler.isAssignableFrom(MysqlHandler.class)
|| cdcHandler.isAssignableFrom(PostgresHandler.class)
|| cdcHandler.isAssignableFrom(RdbmsHandler.class)) {

Stream.of(OperationType.values()).forEach(
ot -> operationMapping.put(ot, new RdbmsNoOp())
);

supportedOperations.stream()
.map(OperationType::fromText)
.forEach(ot -> {
switch (ot) {
case CREATE:
case READ:
operationMapping.put(ot,new RdbmsInsert());
break;
case UPDATE:
operationMapping.put(ot,new RdbmsUpdate());
break;
case DELETE:
operationMapping.put(ot,new RdbmsDelete());
break;
}
});

} else if (cdcHandler.isAssignableFrom(MongoDbHandler.class)) {

Stream.of(OperationType.values()).forEach(
ot -> operationMapping.put(ot, new MongoDbNoOp())
);

supportedOperations.stream()
.map(OperationType::fromText)
.forEach(ot -> {
switch (ot) {
case CREATE:
case READ:
operationMapping.put(ot,new MongoDbInsert());
break;
case UPDATE:
operationMapping.put(ot,new MongoDbUpdate());
break;
case DELETE:
operationMapping.put(ot,new MongoDbDelete());
break;
}
});

} else {
throw new ConfigException("error: unsupported cdc handler " + cdcHandler.getName());
}

return operationMapping;
}

public CdcHandler getCdcHandler(String collection) {
Set<String> predefinedCdcHandler = getPredefinedCdcHandlerClassNames();

Expand All @@ -637,9 +707,9 @@ public CdcHandler getCdcHandler(String collection) {
}

try {
return (CdcHandler) Class.forName(cdcHandler)
.getConstructor(MongoDbSinkConnectorConfig.class)
.newInstance(this);
Class<CdcHandler> cdcHandlerClass = (Class<CdcHandler>)Class.forName(cdcHandler);
return cdcHandlerClass.getConstructor(MongoDbSinkConnectorConfig.class,Map.class)
.newInstance(this,getCdcHandlerOperationMapping(cdcHandlerClass,collection));
} catch (ReflectiveOperationException e) {
throw new ConfigException(e.getMessage(),e);
} catch (ClassCastException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Optional<WriteModel<BsonDocument>> handle(SinkDocument doc) {
logger.debug("key: "+keyDoc.toString());
logger.debug("value: "+valueDoc.toString());

return Optional.of(getCdcOperation(valueDoc).perform(doc));
return Optional.ofNullable(getCdcOperation(valueDoc).perform(doc));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb;

import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import com.mongodb.client.model.WriteModel;
import org.bson.BsonDocument;

public class MongoDbNoOp implements CdcOperation {

@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Optional<WriteModel<BsonDocument>> handle(SinkDocument doc) {
return Optional.empty();
}

return Optional.of(getCdcOperation(valueDoc)
return Optional.ofNullable(getCdcOperation(valueDoc)
.perform(new SinkDocument(keyDoc,valueDoc)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms;

import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import com.mongodb.client.model.WriteModel;
import org.bson.BsonDocument;

public class RdbmsNoOp implements CdcOperation {

@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {
return null;
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb;

import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.ReplaceOneModel;
Expand Down Expand Up @@ -36,6 +38,15 @@ public class MongoDbHandlerTest {
new MongoDbHandler(new MongoDbSinkConnectorConfig(new HashMap<>()),
new HashMap<>());

public static final RdbmsHandler MONGODB_HANDLER_NOOP_MAPPING =
new RdbmsHandler(new MongoDbSinkConnectorConfig(new HashMap<>()),
new HashMap<OperationType, CdcOperation>() {{
put(OperationType.CREATE,new MongoDbNoOp());
put(OperationType.READ,new MongoDbNoOp());
put(OperationType.UPDATE,new MongoDbNoOp());
put(OperationType.DELETE,new MongoDbNoOp());
}});

@Test
@DisplayName("verify existing default config from base class")
public void testExistingDefaultConfig() {
Expand Down Expand Up @@ -209,4 +220,74 @@ public Stream<DynamicTest> testValidCdcOpertionTypes() {

}

@TestFactory
@DisplayName("when valid cdc operation type mapped to NO OP then CdcOperation of type MongoDbNoOp")
public Stream<DynamicTest> testValidCdcOpertionWithNoOpMappings() {

return Stream.of(OperationType.values()).map(ot ->
dynamicTest("test operation " + ot, () ->
assertTrue(MONGODB_HANDLER_NOOP_MAPPING.getCdcOperation(
new BsonDocument("op", new BsonString("c")))
instanceof MongoDbNoOp)
)
);

}

@TestFactory
@DisplayName("when valid CDC event with noop mapping then empty WriteModel")
public Stream<DynamicTest> testValidCdcDocumentWithNoOpMapping() {

return Stream.of(
dynamicTest("test operation "+OperationType.CREATE,
() -> assertEquals(Optional.empty(),
MONGODB_HANDLER_NOOP_MAPPING.handle(
new SinkDocument(
new BsonDocument("_id",new BsonString("1234")),
new BsonDocument("op",new BsonString("c"))
.append("after",new BsonString("{_id:1234,foo:\"blah\"}"))
)
),
() -> "result of MongoDbNoOp must be Optional.empty()")

),
dynamicTest("test operation "+OperationType.READ,
() -> assertEquals(Optional.empty(),
MONGODB_HANDLER_NOOP_MAPPING.handle(
new SinkDocument(
new BsonDocument("_id",new BsonString("1234")),
new BsonDocument("op",new BsonString("r"))
.append("after",new BsonString("{_id:1234,foo:\"blah\"}"))
)
),
() -> "result of MongoDbNoOp must be Optional.empty()")

),
dynamicTest("test operation "+OperationType.UPDATE,
() -> assertEquals(Optional.empty(),
MONGODB_HANDLER_NOOP_MAPPING.handle(
new SinkDocument(
new BsonDocument("id",new BsonString("1234")),
new BsonDocument("op",new BsonString("u"))
.append("patch",new BsonString("{\"$set\":{foo:\"blah\"}}"))
)
),
() -> "result of MongoDbNoOp must be Optional.empty()")

),
dynamicTest("test operation "+OperationType.DELETE,
() -> assertEquals(Optional.empty(),
MONGODB_HANDLER_NOOP_MAPPING.handle(
new SinkDocument(
new BsonDocument("id",new BsonString("1234")),
new BsonDocument("op",new BsonString("d"))
)
),
() -> "result of MongoDbNoOp must be Optional.empty()")

)
);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb;

import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertNull;

@RunWith(JUnitPlatform.class)
public class MongoDbNoOpTest {

@Test
@DisplayName("when any cdc event then WriteModel is null resulting in Optional.empty() in the corresponding handler")
public void testValidSinkDocument() {
assertAll("test behaviour of MongoDbNoOp",
() -> assertNull(new MongoDbNoOp().perform(new SinkDocument(null,null)),"MongoDbNoOp must result in null WriteModel"),
() -> assertNull(new MongoDbNoOp().perform(null),"MongoDbNoOp must result in null WriteModel")
);
}

}
Loading

0 comments on commit 75267ed

Please sign in to comment.