Skip to content

Commit

Permalink
Applied some Confluent style suggestions
Browse files Browse the repository at this point in the history
- Implemented Connector.config() and use AbstractConfig for configuration
- Reduced dependencies (besides Kafka Connect API) to only MongoDB
- Removed superfluous log4j properties file
- Changed the jar name to kafka-connectors-mongodb-sink-VERSION.jar
  • Loading branch information
blootsvoets committed Dec 14, 2016
1 parent d6e2feb commit 576c8ad
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 187 deletions.
38 changes: 28 additions & 10 deletions README.md
Expand Up @@ -21,17 +21,35 @@ The following assumes you have Kafka and the Confluent Schema Registry running.
```shell
./gradlew clean build
```
2. Modify `sink.properties` file according your cluster
```ini
# MongoDb server
mongo.host=
mongo.port=
2. Modify `sink.properties` file according your cluster. The following properties are supported:

<table class="data-table"><tbody>
<tr>
<th>Name</th>
<th>Description</th>
<th>Type</th>
<th>Default</th>
<th>Valid Values</th>
<th>Importance</th>
</tr>
<tr>
<td>mongo.database</td><td>MongoDB database name</td><td>string</td><td></td><td></td><td>high</td></tr>
<tr>
<td>mongo.host</td><td>MongoDB host name to write data to</td><td>string</td><td></td><td></td><td>high</td></tr>
<tr>
<td>record.converter.classes</td><td>List of classes to convert Kafka SinkRecords to BSON documents.</td><td>list</td><td></td><td></td><td>high</td></tr>
<tr>
<td>topics</td><td>List of topics. For each topic, optionally make a property with as key the topic and as value the MongoDB collection the data from that topic should be stored in.</td><td>list</td><td></td><td></td><td>high</td></tr>
<tr>
<td>mongo.password</td><td>Password to connect to MongoDB database. If not set, no credentials are used.</td><td>string</td><td>null</td><td></td><td>medium</td></tr>
<tr>
<td>mongo.username</td><td>Username to connect to MongoDB database. If not set, no credentials are used.</td><td>string</td><td>null</td><td></td><td>medium</td></tr>
<tr>
<td>buffer.capacity</td><td>Maximum number of items in a MongoDB writer buffer. Once the buffer becomes full,the task fails.</td><td>int</td><td>20000</td><td>[1,...]</td><td>low</td></tr>
<tr>
<td>mongo.port</td><td>MongoDB port</td><td>int</td><td>27017</td><td>[1,...]</td><td>low</td></tr>
</tbody></table>

# MongoDb db configuration
mongo.username=
mongo.password=
mongo.database=
```
3. (optional) Modify `standalone.properties` and `standalone.properties` file according your cluster instances. You may need to update the bootstraps and Schema Registry locations.
```ini
bootstrap.servers=
Expand Down
28 changes: 12 additions & 16 deletions build.gradle
Expand Up @@ -2,11 +2,10 @@ plugins {
// Apply the java plugin to add support for Java
id 'java'
id 'idea'
id 'com.commercehub.gradle.plugin.avro-base' version '0.9.0'
id 'jacoco'
}

version = '1.0'
version = '0.1-SNAPSHOT'

targetCompatibility = '1.8'
sourceCompatibility = '1.8'
Expand All @@ -22,7 +21,11 @@ ext.junitVersion = '4.12'
ext.hamcrestVersion = '1.3'
ext.mockitoVersion = '2.2.29'

configurations { codacy }
configurations {
codacy
provided
compile.extendsFrom provided
}

// In this section you declare where to find the dependencies of your project
repositories {
Expand All @@ -37,29 +40,20 @@ repositories {
// In this section you declare the dependencies for your production and test code
dependencies {
// The production code uses the SLF4J logging API at compile time
compile group: 'org.slf4j', name:'slf4j-api', version: slf4jVersion
provided group: 'org.slf4j', name:'slf4j-api', version: slf4jVersion

// to implement producers and consumers
compile group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentVersion
provided group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion
compile group: 'org.mongodb', name: 'mongo-java-driver', version: mongodbVersion
compile group: 'com.google.guava', name: 'guava', version: guavaVersion

testCompile group: 'junit', name: 'junit', version: junitVersion
testCompile group: 'org.hamcrest', name: 'hamcrest-all', version: hamcrestVersion
testCompile group: 'org.mockito', name: 'mockito-core', version: mockitoVersion
testRuntime group: 'org.slf4j', name: 'slf4j-simple', version: slf4jVersion

codacy 'com.github.codacy:codacy-coverage-reporter:1.0.10'
}

task generateAvro(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) {
source("src/main/resources/avro")
outputDir = file("build/avro")
}

compileJava.source(generateAvro.outputs)

idea {
module {
sourceDirs += file("build/avro")
Expand All @@ -72,7 +66,9 @@ jar {
'Implementation-Version': version
}
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
(configurations.runtime - configurations.provided).collect {
it.isDirectory() ? it : zipTree(it)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
@@ -1,2 +1,2 @@
rootProject.name = 'mongoconnector'
rootProject.name = 'kafka-connect-mongodb-sink'

61 changes: 46 additions & 15 deletions src/main/java/org/radarcns/mongodb/MongoDbSinkConnector.java
Expand Up @@ -14,9 +14,10 @@
* limitations under the License.
*/

import com.google.common.base.Strings;
package org.radarcns.mongodb;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -25,11 +26,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;

/**
* Configures the connection between Kafka and MongoDB.
*/
Expand All @@ -44,11 +51,6 @@ public class MongoDbSinkConnector extends SinkConnector {
public static final String BUFFER_CAPACITY = "buffer.capacity";
public static final String RECORD_CONVERTERS = "record.converter.classes";

public static final String[] REQUIRED_PROPERTIES = {
MONGO_HOST, MONGO_PORT, MONGO_USERNAME, MONGO_PASSWORD, MONGO_DATABASE, TOPICS_CONFIG,
RECORD_CONVERTERS,
};

private Map<String, String> connectorConfig;

@Override
Expand All @@ -58,18 +60,19 @@ public String version() {

@Override
public void start(Map<String, String> props) {
connectorConfig = new HashMap<>(props);

for (String req : REQUIRED_PROPERTIES) {
if (Strings.isNullOrEmpty(connectorConfig.get(req))) {
throw new ConnectException("Required connector property '" + req + "' is not set");
List<String> errorMessages = new ArrayList<>();
for (ConfigValue v : config().validate(props)) {
if (!v.errorMessages().isEmpty()) {
errorMessages.add("Property " + v.name() + " with value " + v.value()
+ " does not validate: " + String.join("; ", v.errorMessages()));
}
}
if (Utility.parseArrayConfig(connectorConfig, TOPICS_CONFIG) == null) {
throw new ConnectException("Not all topics in the '" + TOPICS_CONFIG
+ "' property have a topic to database name mapping");
if (!errorMessages.isEmpty()) {
throw new ConnectException("Configuration does not validate: \n\t"
+ String.join("\n\t", errorMessages));
}

connectorConfig = new HashMap<>(props);
log.info(Utility.convertConfigToString(connectorConfig));
}

Expand All @@ -93,6 +96,34 @@ public void stop() {

@Override
public ConfigDef config() {
return null;
ConfigDef conf = new ConfigDef();
conf.define(MONGO_HOST, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, HIGH,
"MongoDB host name to write data to", "MongoDB", 0, ConfigDef.Width.MEDIUM, "MongoDB hostname");
conf.define(MONGO_PORT, ConfigDef.Type.INT, 27017, ConfigDef.Range.atLeast(1), LOW,
"MongoDB port", "MongoDB", 1, ConfigDef.Width.SHORT, "MongoDB port");
conf.define(MONGO_DATABASE, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, HIGH,
"MongoDB database name", "MongoDB", 2, ConfigDef.Width.SHORT, "MongoDB database");
conf.define(MONGO_USERNAME, ConfigDef.Type.STRING, null, MEDIUM,
"Username to connect to MongoDB database. If not set, no credentials are used.",
"MongoDB", 3, ConfigDef.Width.SHORT, "MongoDB username",
Collections.singletonList(MONGO_PASSWORD));
conf.define(MONGO_PASSWORD, ConfigDef.Type.STRING, null, MEDIUM,
"Password to connect to MongoDB database. If not set, no credentials are used.",
"MongoDB", 4, ConfigDef.Width.SHORT, "MongoDB password",
Collections.singletonList(MONGO_USERNAME));
conf.define(TOPICS_CONFIG, ConfigDef.Type.LIST, NO_DEFAULT_VALUE, HIGH,
"List of topics. For each topic, optionally make a property with as key the topic "
+ "and as value the MongoDB collection the data from that topic should be "
+ "stored in.");
conf.define(BUFFER_CAPACITY, ConfigDef.Type.INT, 20_000, ConfigDef.Range.atLeast(1), LOW,
"Maximum number of items in a MongoDB writer buffer. Once the buffer becomes full,"
+ "the task fails.");
conf.define(RECORD_CONVERTERS, ConfigDef.Type.LIST, NO_DEFAULT_VALUE, HIGH,
"List of classes to convert Kafka SinkRecords to BSON documents.");
return conf;
}

public static void main(String... args) {
System.out.println(new MongoDbSinkConnector().config().toHtmlTable());
}
}
17 changes: 8 additions & 9 deletions src/main/java/org/radarcns/mongodb/MongoDbSinkTask.java
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.radarcns.serialization.RecordConverter;
Expand Down Expand Up @@ -46,9 +47,6 @@
* operation will then at some point timeout.
*/
public class MongoDbSinkTask extends SinkTask {
// Assuming record sizes of 1 kB, we default to a 20 MB buffer
private static final int DEFAULT_BUFFER_CAPACITY = 20_000;

private static final Logger log = LoggerFactory.getLogger(MongoDbSinkTask.class);

private final AtomicInteger count;
Expand All @@ -68,15 +66,16 @@ public String version() {

@Override
public void start(Map<String, String> props) {
int bufferCapacity = Utility.getInt(props, BUFFER_CAPACITY, DEFAULT_BUFFER_CAPACITY);
buffer = new ArrayBlockingQueue<>(bufferCapacity);

timer = new Timer();
timer.schedule(new Monitor(log, count, "have been processed"), 0, 30000);
timer.schedule(new Monitor(log, count, "have been processed"), 0, 30_000);

AbstractConfig config = new AbstractConfig(new MongoDbSinkConnector().config().parse(props));

buffer = new ArrayBlockingQueue<>(config.getInt(BUFFER_CAPACITY));

List<RecordConverter> mongoConverters = Utility.loadRecordConverters(
getClass().getClassLoader(), props.get(RECORD_CONVERTERS));
MongoWrapper mongoHelper = new MongoWrapper(props);
getClass().getClassLoader(), config.getList(RECORD_CONVERTERS));
MongoWrapper mongoHelper = new MongoWrapper(config);

writer = new MongoDbWriter(mongoHelper, buffer, mongoConverters, timer);
writer.start();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/radarcns/mongodb/MongoDbWriter.java
Expand Up @@ -74,7 +74,7 @@ public MongoDbWriter(MongoWrapper mongoHelper, BlockingQueue<SinkRecord> buffer,
count = new AtomicInteger(0);

Monitor monitor = new Monitor(log, count, "have been written in MongoDB", this.buffer);
timer.schedule(monitor, 0, 30000);
timer.schedule(monitor, 0, 30_000);

latestOffsets = new HashMap<>();
stopping = new AtomicBoolean(false);
Expand Down
36 changes: 25 additions & 11 deletions src/main/java/org/radarcns/mongodb/MongoWrapper.java
Expand Up @@ -24,6 +24,7 @@
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.Document;
import org.radarcns.util.Utility;
Expand All @@ -37,6 +38,11 @@

import static com.mongodb.client.model.Filters.eq;
import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
import static org.radarcns.mongodb.MongoDbSinkConnector.MONGO_DATABASE;
import static org.radarcns.mongodb.MongoDbSinkConnector.MONGO_HOST;
import static org.radarcns.mongodb.MongoDbSinkConnector.MONGO_PASSWORD;
import static org.radarcns.mongodb.MongoDbSinkConnector.MONGO_PORT;
import static org.radarcns.mongodb.MongoDbSinkConnector.MONGO_USERNAME;

/**
* Wrapper around {@link MongoClient}.
Expand All @@ -54,24 +60,28 @@ public class MongoWrapper implements Closeable {
* @param config Configuration of the client, according to {@link MongoDbSinkConnector}.
* @throws ConnectException a MongoClient could not be created.
*/
public MongoWrapper(Map<String, String> config) {
public MongoWrapper(AbstractConfig config) {
mapping = Utility.parseArrayConfig(config, TOPICS_CONFIG);
dbName = config.get(MongoDbSinkConnector.MONGO_DATABASE);
dbName = config.getString(MONGO_DATABASE);
credentials = createCredentials(config);
mongoClient = createClient(config);
}

private List<MongoCredential> createCredentials(Map<String, String> config) {
String userName = config.get(MongoDbSinkConnector.MONGO_USERNAME);
char[] password = config.get(MongoDbSinkConnector.MONGO_PASSWORD).toCharArray();
private List<MongoCredential> createCredentials(AbstractConfig config) {
if (config.values().containsKey(MONGO_USERNAME)) {
String userName = config.getString(MONGO_USERNAME);
char[] password = config.getString(MONGO_PASSWORD).toCharArray();

return Collections.singletonList(
MongoCredential.createCredential(userName, dbName, password));
return Collections.singletonList(
MongoCredential.createCredential(userName, dbName, password));
} else {
return Collections.emptyList();
}
}

private MongoClient createClient(Map<String, String> config) {
String host = config.get(MongoDbSinkConnector.MONGO_HOST);
int port = Integer.parseInt(config.get(MongoDbSinkConnector.MONGO_PORT));
private MongoClient createClient(AbstractConfig config) {
String host = config.getString(MONGO_HOST);
int port = config.getInt(MONGO_PORT);

try {
return new MongoClient(new ServerAddress(host, port), credentials);
Expand Down Expand Up @@ -109,7 +119,11 @@ public void close() {
*/
public void store(String topic, Document doc) throws MongoException {
MongoDatabase database = mongoClient.getDatabase(dbName);
MongoCollection<Document> collection = database.getCollection(mapping.get(topic));
String collectionName = mapping.get(topic);
if (collectionName == null) {
collectionName = topic;
}
MongoCollection<Document> collection = database.getCollection(collectionName);

collection.replaceOne(eq("_id", doc.get("_id")), doc, (new UpdateOptions()).upsert(true));
}
Expand Down
Expand Up @@ -26,17 +26,15 @@
import java.util.Collections;
import java.util.List;

import javax.annotation.Nonnull;

public class AggregatedAccelerationRecordConverter implements RecordConverter {
@Override
public Collection<String> supportedSchemaNames() {
return Collections.singleton(org.radarcns.key.WindowedKey.class.getName() + "-"
+ org.radarcns.aggregator.DoubleArrayAggegator.class.getName());
return Collections.singleton("org.radarcns.key.WindowedKey-"
+ "org.radarcns.aggregator.DoubleArrayAggegator");
}

@Override
public Document convert(@Nonnull SinkRecord record) {
public Document convert(SinkRecord record) {
Struct key = (Struct) record.key();
Struct value = (Struct) record.value();

Expand Down
Expand Up @@ -26,17 +26,15 @@
import java.util.Collection;
import java.util.Collections;

import javax.annotation.Nonnull;

public class DoubleAggregatedRecordConverter implements RecordConverter {
@Override
public Collection<String> supportedSchemaNames() {
return Collections.singleton(org.radarcns.key.WindowedKey.class.getName() + "-"
+ org.radarcns.aggregator.DoubleAggegator.class.getName());
return Collections.singleton("org.radarcns.key.WindowedKey-"
+ "org.radarcns.aggregator.DoubleAggegator");
}

@Override
public Document convert(@Nonnull SinkRecord record) {
public Document convert(SinkRecord record) {
Struct key = (Struct) record.key();
Struct value = (Struct) record.value();

Expand Down

0 comments on commit 576c8ad

Please sign in to comment.