From 892dc31c36d85bf45863b9b1532289f3ab9d57af Mon Sep 17 00:00:00 2001 From: Eric Long Date: Mon, 27 Apr 2020 08:42:54 -0400 Subject: [PATCH] 0004347: Kafka properties from engine pass through to producer --- .../src/asciidoc/appendix/kafka.ad | 21 +++++++--- .../symmetric/load/KafkaWriterFilter.java | 40 ++++++++++++++++--- .../symmetric/io/data/writer/KafkaWriter.java | 37 +++++++++-------- 3 files changed, 71 insertions(+), 27 deletions(-) diff --git a/symmetric-assemble/src/asciidoc/appendix/kafka.ad b/symmetric-assemble/src/asciidoc/appendix/kafka.ad index 80527756c9..aa4924f86f 100644 --- a/symmetric-assemble/src/asciidoc/appendix/kafka.ad +++ b/symmetric-assemble/src/asciidoc/appendix/kafka.ad @@ -18,7 +18,7 @@ endif::pro[] .Set the output message format with the following property ---- -kafka.format +kafka.format=JSON|XML|AVRO|CSV ---- @@ -95,7 +95,7 @@ TABLE,table name,EVENT,INSERT|UPDATE|DELETE,column name,value, ... .Set the topic using the following property ---- -kafka.topic.by +kafka.topic.by=CHANNEL|TABLE ---- @@ -108,7 +108,7 @@ TABLE:: This will send to a topic based on the table name of the change. .Set following property to determine how messages will be sent. ---- -kafka.message.by +kafka.message.by=BATCH|ROW ---- @@ -121,7 +121,7 @@ ROW:: This will send one messsage for each change captured. .Set following property to specify the producer of the messages. ---- -kafka.producer +kafka.producer=myapplication ---- @@ -148,4 +148,15 @@ kafka.avro.java.package ==== NOTE: The jar file containing the AVRO generated POJO java beans must be placed in the /lib or /patches folder of SymmetricDS and then a restart is required. -==== \ No newline at end of file +==== + +==== Using Authentication + +Any engine property prefixed with `kafkaclient` will be passed through to the Kafka client producer. +Here is an example of authentication over SSL. + +---- +kafkaclient.security.protocol=SASL_SSL +kafkaclient.sasl.mechanism=PLAIN +kafkaclient.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="myuser" password="mypassword"; +---- diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 7c11394328..8c6e823cb7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -1,3 +1,23 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jumpmind.symmetric.load; import java.beans.PropertyDescriptor; @@ -28,6 +48,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.jumpmind.db.model.Table; +import org.jumpmind.properties.TypedProperties; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.io.data.CsvData; import org.jumpmind.symmetric.io.data.DataContext; @@ -54,7 +75,7 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { protected Map> kafkaDataMap = new HashMap>(); protected String kafkaDataKey; - private final Logger log = LoggerFactory.getLogger(IDatabaseWriterFilter.class); + private final Logger log = LoggerFactory.getLogger(getClass()); private String url; @@ -137,6 +158,13 @@ public KafkaWriterFilter(IParameterService parameterService) { configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl); } + + TypedProperties props = parameterService.getAllParameters(); + for (Object key : props.keySet()) { + if (key.toString().startsWith("kafkaclient.")) { + configs.put(key.toString().substring(12), props.get(key)); + } + } } public boolean beforeWrite(DataContext context, Table table, CsvData data) { @@ -144,8 +172,6 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { if (table.getNameLowerCase().startsWith("sym_")) { return true; } else { - log.debug("Processing table " + table + " for Kafka"); - String[] rowData = data.getParsedData(CsvData.ROW_DATA); if (data.getDataEventType() == DataEventType.DELETE) { rowData = data.getParsedData(CsvData.OLD_DATA); @@ -159,6 +185,8 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { kafkaDataKey = table.getNameLowerCase(); } + log.debug("Processing table {} for Kafka on topic {}", table, kafkaDataKey); + if (kafkaDataMap.get(kafkaDataKey) == null) { kafkaDataMap.put(kafkaDataKey, new ArrayList()); } @@ -212,7 +240,7 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { for (int i = 0; i < table.getColumnNames().length; i++) { String colName = getColumnName(table.getName(), table.getColumnNames()[i], pojo); if (colName != null) { - Class propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName); + Class propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName); if (CharSequence.class.equals(propertyTypeClass)) { PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]); } @@ -397,6 +425,7 @@ public void batchComplete(DataContext context) { if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) { String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); + log.debug("Kafka client config: {}", configs); KafkaProducer producer = new KafkaProducer(configs); try { if (confluentUrl == null && kafkaDataMap.size() > 0) { @@ -436,11 +465,12 @@ public void batchRolledback(DataContext context) { } public void sendKafkaMessage(KafkaProducer producer, String kafkaText, String topic) { + log.debug("Sending message (topic={}) {}", topic, kafkaText); producer.send(new ProducerRecord(topic, kafkaText)); - log.debug("Data to be sent to Kafka-" + kafkaText); } public void sendKafkaMessageByObject(Object bean, String topic) { + log.debug("Sending object (topic={}) {}", topic, bean); KafkaProducer producer = new KafkaProducer(configs); producer.send(new ProducerRecord(topic, bean)); producer.close(); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java index 51c64abc5e..8d9d6ef8db 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java @@ -1,8 +1,26 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jumpmind.symmetric.io.data.writer; -import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IDatabasePlatform; -import org.jumpmind.symmetric.io.data.CsvData; public class KafkaWriter extends DynamicDefaultDatabaseWriter { @@ -11,19 +29,4 @@ public KafkaWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform target super(symmetricPlatform, targetPlatform, prefix, conflictResolver, settings); } - /* - @Override - protected Table lookupTableAtTarget(Table sourceTable) { - return sourceTable; - } - - - @Override - protected void logFailureDetails(Throwable e, CsvData data, boolean logLastDmlDetails) { - } - - @Override - protected void allowInsertIntoAutoIncrementColumns(boolean value, Table table) { - } - */ }