A MongoDb sink connector
Java
Latest commit d075175 Dec 14, 2016 @blootsvoets blootsvoets committed on GitHub Merge pull request #5 from RADAR-CNS/dev
Applied most of the Confluent suggestions

README.md

RADAR-MongoDbConnector

Build Status Codacy Badge

The MongoDB sink connector is a tool for scalably and reliably streaming data between Apache Kafka and MongoDB. It exports Avro data from Kafka topics into the MongoDB.

Currently it supports only two types of data:

The current version proofs how to extract data coming from an Empatica E4 device using the RADAR-CNS Android application and analysed by the RADAR-CNS Kafka Backend

Dependencies

The following assumes you have Kafka and the Confluent Schema Registry running.

Quickstart for RADAR-CNS

  • Build the project. Go inside the project folder and run
./gradlew clean build
  • Modify sink.properties file according your cluster. The following properties are supported:
Name Description Type Default Valid Values Importance
mongo.databaseMongoDB database namestringhigh
mongo.hostMongoDB host name to write data tostringhigh
topicsList of topics to be streamed.listhigh
collection.formatA format string for the destination collection name, which may contain `${topic}`as a placeholder for the originating topic name. For example, `kafka_${topic}` for the topic `orders` will map to the collection name `kafka_orders`.string{$topic}medium
mongo.passwordPassword to connect to MongoDB database. If not set, no credentials are used.stringnullmedium
mongo.usernameUsername to connect to MongoDB database. If not set, no credentials are used.stringnullmedium
record.converter.classRecordConverterFactory that returns classes to convert Kafka SinkRecords to BSON documents.classclass org.radarcns.serialization.RecordConverterFactorymedium
buffer.capacityMaximum number of items in a MongoDB writer buffer. Once the buffer becomes full,the task fails.int20000[1,...]low
mongo.portMongoDB portint27017[1,...]low
  • (optional) Modify standalone.properties and standalone.properties file according your cluster instances. You may need to update the bootstraps and Schema Registry locations.
bootstrap.servers=
key.converter.schema.registry.url=
  • Copy your jar file inside your Kafka Server
  • Copy all configuration files inside your Kafka Server
    • sink.properties
    • standalone.properties (optional)
    • cluster.properties (optional)
  • Put the connector build/libs/kafka-connect-mongodb-sink-*.jar in the folder share/java.

    • standalone mode
    /bin/connect-standalone standalone.properties sink.properties
    • distributed mode
    /bin/connect-distributed cluster.properties sink.properties
  • stop your connector using CTRL-C

To use further data types, extend org.radarcns.serialization.RecordConverterFactory and set the new class name in the record.converter.class property.

Tuning

The only available setting is the number of records returned in a single call to poll() (i.e. consumer.max.poll.records param inside standalone.properties)

Note

Connectors can be run inside any machine where Kafka has been installed. Therefore, you can fire them also inside a machine that does not host a Kafka broker.

Reset

To reset a connector running in standalone mode you have to stop it and then modify name and offset.storage.file.filename respectively inside sink.properties and standalone.properties