diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 79d2fecb57..d517a9ffd1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -72,7 +72,7 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { protected final String KAFKA_TEXT_CACHE = "KAFKA_TEXT_CACHE" + this.hashCode(); - protected Map> kafkaDataMap = new HashMap>(); + protected Map>> kafkaDataMap = new HashMap>>(); protected String kafkaDataKey; private final Logger log = LoggerFactory.getLogger(getClass()); @@ -131,7 +131,7 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { Map tableNameCache = new HashMap(); Map> tableColumnCache = new HashMap>(); - public static KafkaProducer kafkaProducer; + public static KafkaProducer kafkaProducer; public KafkaWriterFilter(IParameterService parameterService) { schema = parser.parse(AVRO_CDC_SCHEMA); @@ -150,7 +150,6 @@ public KafkaWriterFilter(IParameterService parameterService) { this.schemaPackage = parameterService.getString(ParameterConstants.KAFKA_AVRO_JAVA_PACKAGE); if (kafkaProducer == null) { - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); @@ -169,7 +168,7 @@ public KafkaWriterFilter(IParameterService parameterService) { configs.put(key.toString().substring(12), props.get(key)); } } - kafkaProducer = new KafkaProducer(configs); + kafkaProducer = new KafkaProducer(configs); this.log.debug("Kafka client config: {}", configs); } } @@ -185,6 +184,19 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { } StringBuffer kafkaText = new StringBuffer(); + String kafkaKey = null; + + if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { + StringBuffer sb = new StringBuffer(); + sb.append(table.getName()).append(":"); + for (int i = 0; i < table.getPrimaryKeyColumnNames().length; i++) { + sb.append(":").append(rowData[i]); + } + kafkaKey = String.valueOf(sb.toString().hashCode()); + } else if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { + String s = context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); + kafkaKey = String.valueOf(s.hashCode()); + } if (topicBy.equals(KAFKA_TOPIC_BY_CHANNEL)) { kafkaDataKey = context.getBatch().getChannelId(); @@ -195,9 +207,9 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { log.debug("Processing table {} for Kafka on topic {}", table, kafkaDataKey); if (kafkaDataMap.get(kafkaDataKey) == null) { - kafkaDataMap.put(kafkaDataKey, new ArrayList()); + kafkaDataMap.put(kafkaDataKey, new ArrayList>()); } - List kafkaDataList = kafkaDataMap.get(kafkaDataKey); + List> kafkaDataList = kafkaDataMap.get(kafkaDataKey); if (outputFormat.equals(KAFKA_FORMAT_JSON)) { kafkaText.append("{\"").append(table.getName()).append("\": {").append("\"eventType\": \"" + data.getDataEventType() + "\",") @@ -266,7 +278,7 @@ else if (Long.class.equals(propertyTypeClass)) { } } } - sendKafkaMessageByObject(pojo, kafkaDataKey); + sendKafkaMessage(new ProducerRecord(kafkaDataKey, kafkaKey, pojo)); } else { throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); } @@ -307,7 +319,7 @@ else if (Long.class.equals(propertyTypeClass)) { } } } - kafkaDataList.add(kafkaText.toString()); + kafkaDataList.add(new ProducerRecord(kafkaDataKey, kafkaKey, kafkaText.toString())); } return false; } @@ -436,21 +448,22 @@ public void batchComplete(DataContext context) { try { if (confluentUrl == null && kafkaDataMap.size() > 0) { StringBuffer kafkaText = new StringBuffer(); + String kafkaKey = null; - - for (Map.Entry> entry : kafkaDataMap.entrySet()) { - for (String row : entry.getValue()) { + for (Map.Entry>> entry : kafkaDataMap.entrySet()) { + for (ProducerRecord record : entry.getValue()) { if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { - sendKafkaMessage(row, entry.getKey()); + sendKafkaMessage(record); } else { - kafkaText.append(row); + kafkaKey = record.key(); + kafkaText.append(record.value()); } } if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { - sendKafkaMessage(kafkaText.toString(), entry.getKey()); + sendKafkaMessage(new ProducerRecord(entry.getKey(), kafkaKey, kafkaText.toString())); } } - kafkaDataMap = new HashMap>(); + kafkaDataMap = new HashMap>>(); } } catch (Exception e) { log.warn("Unable to write batch to Kafka " + batchFileName, e); @@ -469,14 +482,9 @@ public void batchCommitted(DataContext context) { public void batchRolledback(DataContext context) { } - public void sendKafkaMessage(String kafkaText, String topic) { - log.debug("Sending message (topic={}) {}", topic, kafkaText); - kafkaProducer.send(new ProducerRecord(topic, kafkaText)); - } - - public void sendKafkaMessageByObject(Object bean, String topic) { - log.debug("Sending object (topic={}) {}", topic, bean); - kafkaProducer.send(new ProducerRecord(topic, bean)); + public void sendKafkaMessage(ProducerRecord record) { + log.debug("Sending message (topic={}) (key={}) {}", record.topic(), record.key(), record.value()); + kafkaProducer.send(record); } public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException {