Custom Kafka Connect Converters and SMTs applied to different binary inputs to produce a MongoDB Bson Document.
Converters:
- org.hifly.kafka.OracleRawToBsonKeyConverter - Convert a byte [] to Oracle RAW data type. Oracle RAW is then used to create MongoDB Bson document.
- org.hifly.kafka.ByteArrayAndStringConverter - pass through for byte array schema type and string schema type.
SMTs:
- org.hifly.kafka.smt.JsonKeyToValue - get value from massage record.key and copy on a new field in message record.value
mvn install:install-file -Dfile=ojdbc10.jar -DgroupId=com.oracle -DartifactId=ojdbc10 -Dversion=19.3 -Dpackaging=jar
mvn clean test
mvn clean compile assembly:single
{
"name" : "mongo-sample-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "XXXXXXXX",
"topics": "XXXXXXXX",
"key.converter": "org.hifly.kafka.OracleRawToBsonKeyConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"database": "XXXXXXXX",
"collection": "XXXXXXXX",
"errors.tolerance": "all",
"mongo.errors.log.enable": "true",
"delete.on.null.values": "true",
"document.id.strategy.overwrite.existing": "true",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy",
"delete.writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy",
"publish.full.document.only": "true",
"transforms": "ReplaceField, addKeyToValue, tsConverter1",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.exclude": "ID,XXXXXXXX,XXXXXXXX",
"transforms.addKeyToValue.type": "org.hifly.kafka.smt.JsonKeyToValue",
"transforms.addKeyToValue.valuename": "ID",
"transforms.addKeyToValue.idkey": "_id",
"transforms.tsConverter1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsConverter1.target.type": "dd/MM/yyyy",
"transforms.tsConverter1.field": "START_DATE"
}
}