Skip to content

Commit

Permalink
Documentation for Kafka integration
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Jan 9, 2017
1 parent 90df458 commit ce8d8ef
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 0 deletions.
7 changes: 7 additions & 0 deletions symmetric-assemble/src/asciidoc/examples.ad
@@ -0,0 +1,7 @@

== By Example

This chapter focuses on using examples for a variety of use cases with SymmetricDS.

include::examples/kafka.ad[]

336 changes: 336 additions & 0 deletions symmetric-assemble/src/asciidoc/examples/kafka.ad
@@ -0,0 +1,336 @@

=== Kafka Integration

Use SymmetricDS to capture changes in your database and publish the changes to a Kafka message queue.

==== Kafka Setup

If you already have a Kafka server running proceed to step 2. Otherwise download and follow the quick start quide provided by Kafka.

----
https://kafka.apache.org/quickstart
----

If your using the quick start you can run through steps 1-5 and finish with setting up a consumer. This will allow you to see the messages that arrive on your Kafka queue from SymmetricDS.

==== SymmetricDS Setup

* Create a new extension point for the <<IDatabaseWriterFilter>> implementation. For this example we will setup a java based implementation that will write either CSV or JSON data to the Kafka queue for the "client" target node group.

NOTE: Java based extension points require a full JDK (not just the JRE).

ifdef::pro[]
image::examples/extension-kafka.png[]

* After creating the extension, select it and hit the "Edit Script" button to copy and paste in the implementation below.
[source, Java]
----

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaWriterFilter implements IDatabaseWriterFilter {
protected final String KAKFA_TEXT_CACHE = "KAKFA_TEXT_CACHE" + this.hashCode();

private final Logger log = LoggerFactory.getLogger(getClass());

public boolean beforeWrite(DataContext context, Table table, CsvData data) {
if (table.getName().toUpperCase().startsWith("SYM_")) {
return true;
}
else {
log.info("Processing table " + table + " for Kafka");

String[] rowData = data.getParsedData(CsvData.ROW_DATA);
if (data.getDataEventType() == DataEventType.DELETE) {
rowData = data.getParsedData(CsvData.OLD_DATA);
}

StringBuffer kafkaText = new StringBuffer();
if (context.get(KAKFA_TEXT_CACHE) != null) {
kafkaText = (StringBuffer) context.get(KAKFA_TEXT_CACHE);
}

boolean useJson = false;

if (useJson) {
kafkaText.append("{\"")
.append(table.getName())
.append("\": {")
.append("\"eventType\": \"" + data.getDataEventType() + "\",")
.append("\"data\": { ");
for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append("\"" + table.getColumnNames()[i] + "\": \"" + rowData[i]);
if (i + 1 < table.getColumnNames().length) {
kafkaText.append("\",");
}
}
kafkaText.append(" } } }");
}
else {
kafkaText.append("\nTABLE")
.append(",")
.append(table.getName())
.append(",")
.append("EVENT")
.append(",")
.append(data.getDataEventType())
.append(",");

for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append(table.getColumnNames()[i])
.append(",")
.append(rowData[i]);
if (i + 1 < table.getColumnNames().length) {
kafkaText.append(",");
}
}
}
context.put(KAKFA_TEXT_CACHE, kafkaText);
}
return false;
}

public void afterWrite(DataContext context, Table table, CsvData data) {
}

public boolean handlesMissingTable(DataContext context, Table table) {
return true;
}

public void earlyCommit(DataContext context) {
}

public void batchComplete(DataContext context) {
if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) {
String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();
log.info("Processing batch " + batchFileName + " for Kafka");
try {
File batchesDir = new File("batches");
if (!batchesDir.exists()) {
batchesDir.mkdir();
}
File batchFile = new File(batchesDir.getAbsoluteFile() + "/" + batchFileName);

if (context.get(KAKFA_TEXT_CACHE) != null) {
String kafkaText = ((StringBuffer) context.get(KAKFA_TEXT_CACHE)).toString();
FileUtils.writeStringToFile(batchFile, KAKFA_TEXT_CACHE);
sendKafkaMessage(kafkaText);
} else {
log.info("No text found to write to kafka queue");
}
}
catch (Exception e) {
log.warn("Unable to write batch to Kafka " + batchFileName, e);
e.printStackTrace();
}
}
}

public void batchCommitted(DataContext context) {
}

public void batchRolledback(DataContext context) {
}

public void sendKafkaMessage(String kafkaText) {
Map<String,Object> configs = new HashMap<String, Object>();

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "my-producer");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

producer.send(new ProducerRecord<String, String>("symmetricds", kafkaText));
log.debug("Data to be sent to Kafka-" + kafkaText);

producer.close();
}
}


----
endif::pro[]

ifndef::pro[]

.Run the following SQL to create the Kafka extension.
[source, SQL]
----
insert into SYM_EXTENSION (EXTENSION_ID, EXTENSION_TYPE, INTERFACE_NAME, NODE_GROUP_ID, ENABLED, EXTENSION_ORDER, EXTENSION_TEXT, CREATE_TIME, LAST_UPDATE_BY, LAST_UPDATE_TIME) values ('KafkaDataWriter','java','org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter','client',1,1,'
import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaWriterFilter implements IDatabaseWriterFilter {
protected final String KAKFA_TEXT_CACHE = "KAKFA_TEXT_CACHE" + this.hashCode();

private final Logger log = LoggerFactory.getLogger(getClass());

public boolean beforeWrite(DataContext context, Table table, CsvData data) {
if (table.getName().toUpperCase().startsWith("SYM_")) {
return true;
}
else {
log.info("Processing table " + table + " for Kafka");

String[] rowData = data.getParsedData(CsvData.ROW_DATA);
if (data.getDataEventType() == DataEventType.DELETE) {
rowData = data.getParsedData(CsvData.OLD_DATA);
}

StringBuffer kafkaText = new StringBuffer();
if (context.get(KAKFA_TEXT_CACHE) != null) {
kafkaText = (StringBuffer) context.get(KAKFA_TEXT_CACHE);
}

boolean useJson = false;

if (useJson) {
kafkaText.append("{\"")
.append(table.getName())
.append("\": {")
.append("\"eventType\": \"" + data.getDataEventType() + "\",")
.append("\"data\": { ");
for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append("\"" + table.getColumnNames()[i] + "\": \"" + rowData[i]);
if (i + 1 < table.getColumnNames().length) {
kafkaText.append("\",");
}
}
kafkaText.append(" } } }");
}
else {
kafkaText.append("\nTABLE")
.append(",")
.append(table.getName())
.append(",")
.append("EVENT")
.append(",")
.append(data.getDataEventType())
.append(",");

for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append(table.getColumnNames()[i])
.append(",")
.append(rowData[i]);
if (i + 1 < table.getColumnNames().length) {
kafkaText.append(",");
}
}
}
context.put(KAKFA_TEXT_CACHE, kafkaText);
}
return false;
}

public void afterWrite(DataContext context, Table table, CsvData data) {
}

public boolean handlesMissingTable(DataContext context, Table table) {
return true;
}

public void earlyCommit(DataContext context) {
}

public void batchComplete(DataContext context) {
if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) {
String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();
log.info("Processing batch " + batchFileName + " for Kafka");
try {
File batchesDir = new File("batches");
if (!batchesDir.exists()) {
batchesDir.mkdir();
}
File batchFile = new File(batchesDir.getAbsoluteFile() + "/" + batchFileName);

if (context.get(KAKFA_TEXT_CACHE) != null) {
String kafkaText = ((StringBuffer) context.get(KAKFA_TEXT_CACHE)).toString();
FileUtils.writeStringToFile(batchFile, KAKFA_TEXT_CACHE);
sendKafkaMessage(kafkaText);
} else {
log.info("No text found to write to kafka queue");
}
}
catch (Exception e) {
log.warn("Unable to write batch to Kafka " + batchFileName, e);
e.printStackTrace();
}
}
}

public void batchCommitted(DataContext context) {
}

public void batchRolledback(DataContext context) {
}

public void sendKafkaMessage(String kafkaText) {
Map<String,Object> configs = new HashMap<String, Object>();

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "symmetricds-producer");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

producer.send(new ProducerRecord<String, String>("test", kafkaText));
log.debug("Data to be sent to Kafka-" + kafkaText);

producer.close();
}
}

',{ts '2017-01-09 10:58:17.981'},'admin',{ts '2017-01-09 13:04:37.490'});

----
endif::pro[]


* The default kafka server and port are set to localhost:9092 with a client id of "symmetricds-producer". You will need to adjust these variables in the sendKafkaMessage function to match your Kafka setup if they are different.

[source, Java]
-----
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "symmetricds-producer");
-----

* JSON or CSV can be adjusted at line 40 of the script. The script defaults to CSV. By setting this variable at line 40 to true the JSON will be sent to the queue. Additional implementations for XML or other formats could be added here if necessary.

[source, Java]
-----
// Line 40
boolean useJson = false;
-----

* Testing. You are now ready to test your Kafka messaging. Make a change to to a table that is configured to replicate to the target node group used in step 1 of this example. This example was setup for the 'client' node group so any changes that are designed to replicate to the client node group will run through this extension point and should be sent to your Kafka queue.

Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions symmetric-assemble/src/asciidoc/user-guide.ad
Expand Up @@ -36,4 +36,5 @@ include::configuration.ad[]
include::manage.ad[]
include::advanced-topics.ad[]
include::developer.ad[]
include::examples.ad[]
include::appendix.ad[]

0 comments on commit ce8d8ef

Please sign in to comment.