From d3f0a3aec7d89dfebb5cf48948998ceae5bfc941 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Fri, 1 May 2020 16:16:52 -0400 Subject: [PATCH] 0004385: Kafka message key for ordering with partitioned topics # Conflicts: # symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java --- .../symmetric/load/KafkaWriterFilter.java | 93 +++++++++++-------- 1 file changed, 53 insertions(+), 40 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 8c6e823cb7..6c70783616 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -72,7 +72,7 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { protected final String KAFKA_TEXT_CACHE = "KAFKA_TEXT_CACHE" + this.hashCode(); - protected Map> kafkaDataMap = new HashMap>(); + protected Map>> kafkaDataMap = new HashMap>>(); protected String kafkaDataKey; private final Logger log = LoggerFactory.getLogger(getClass()); @@ -131,6 +131,8 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { Map tableNameCache = new HashMap(); Map> tableColumnCache = new HashMap>(); + public static KafkaProducer kafkaProducer; + public KafkaWriterFilter(IParameterService parameterService) { schema = parser.parse(AVRO_CDC_SCHEMA); this.url = parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + "db.url"); @@ -147,24 +149,28 @@ public KafkaWriterFilter(IParameterService parameterService) { this.confluentUrl = parameterService.getString(ParameterConstants.KAFKA_CONFLUENT_REGISTRY_URL); this.schemaPackage = parameterService.getString(ParameterConstants.KAFKA_AVRO_JAVA_PACKAGE); - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer); - - if (confluentUrl != null) { - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); - - configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl); - } - - TypedProperties props = parameterService.getAllParameters(); - for (Object key : props.keySet()) { - if (key.toString().startsWith("kafkaclient.")) { - configs.put(key.toString().substring(12), props.get(key)); - } - } + if (kafkaProducer == null) { + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer); + + if (confluentUrl != null) { + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + + configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl); + } + + TypedProperties props = parameterService.getAllParameters(); + for (Object key : props.keySet()) { + if (key.toString().startsWith("kafkaclient.")) { + configs.put(key.toString().substring(12), props.get(key)); + } + } + kafkaProducer = new KafkaProducer(configs); + this.log.debug("Kafka client config: {}", configs); + } } public boolean beforeWrite(DataContext context, Table table, CsvData data) { @@ -178,6 +184,19 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { } StringBuffer kafkaText = new StringBuffer(); + String kafkaKey = null; + + if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { + StringBuffer sb = new StringBuffer(); + sb.append(table.getName()).append(":"); + for (int i = 0; i < table.getPrimaryKeyColumnNames().length; i++) { + sb.append(":").append(rowData[i]); + } + kafkaKey = String.valueOf(sb.toString().hashCode()); + } else if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { + String s = context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); + kafkaKey = String.valueOf(s.hashCode()); + } if (topicBy.equals(KAFKA_TOPIC_BY_CHANNEL)) { kafkaDataKey = context.getBatch().getChannelId(); @@ -188,9 +207,9 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { log.debug("Processing table {} for Kafka on topic {}", table, kafkaDataKey); if (kafkaDataMap.get(kafkaDataKey) == null) { - kafkaDataMap.put(kafkaDataKey, new ArrayList()); + kafkaDataMap.put(kafkaDataKey, new ArrayList>()); } - List kafkaDataList = kafkaDataMap.get(kafkaDataKey); + List> kafkaDataList = kafkaDataMap.get(kafkaDataKey); if (outputFormat.equals(KAFKA_FORMAT_JSON)) { kafkaText.append("{\"").append(table.getName()).append("\": {").append("\"eventType\": \"" + data.getDataEventType() + "\",") @@ -259,7 +278,7 @@ else if (Long.class.equals(propertyTypeClass)) { } } } - sendKafkaMessageByObject(pojo, kafkaDataKey); + sendKafkaMessage(new ProducerRecord(kafkaDataKey, kafkaKey, pojo)); } else { throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); } @@ -300,7 +319,7 @@ else if (Long.class.equals(propertyTypeClass)) { } } } - kafkaDataList.add(kafkaText.toString()); + kafkaDataList.add(new ProducerRecord(kafkaDataKey, kafkaKey, kafkaText.toString())); } return false; } @@ -430,21 +449,22 @@ public void batchComplete(DataContext context) { try { if (confluentUrl == null && kafkaDataMap.size() > 0) { StringBuffer kafkaText = new StringBuffer(); + String kafkaKey = null; - - for (Map.Entry> entry : kafkaDataMap.entrySet()) { - for (String row : entry.getValue()) { + for (Map.Entry>> entry : kafkaDataMap.entrySet()) { + for (ProducerRecord record : entry.getValue()) { if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { - sendKafkaMessage(producer, row, entry.getKey()); + sendKafkaMessage(record); } else { - kafkaText.append(row); + kafkaKey = record.key(); + kafkaText.append(record.value()); } } if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { - sendKafkaMessage(producer, kafkaText.toString(), entry.getKey()); + sendKafkaMessage(new ProducerRecord(entry.getKey(), kafkaKey, kafkaText.toString())); } } - kafkaDataMap = new HashMap>(); + kafkaDataMap = new HashMap>>(); } } catch (Exception e) { log.warn("Unable to write batch to Kafka " + batchFileName, e); @@ -464,16 +484,9 @@ public void batchCommitted(DataContext context) { public void batchRolledback(DataContext context) { } - public void sendKafkaMessage(KafkaProducer producer, String kafkaText, String topic) { - log.debug("Sending message (topic={}) {}", topic, kafkaText); - producer.send(new ProducerRecord(topic, kafkaText)); - } - - public void sendKafkaMessageByObject(Object bean, String topic) { - log.debug("Sending object (topic={}) {}", topic, bean); - KafkaProducer producer = new KafkaProducer(configs); - producer.send(new ProducerRecord(topic, bean)); - producer.close(); + public void sendKafkaMessage(ProducerRecord record) { + log.debug("Sending message (topic={}) (key={}) {}", record.topic(), record.key(), record.value()); + kafkaProducer.send(record); } public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException {