Skip to content

Commit

Permalink
0004385: Kafka message key for ordering with partitioned topics
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 1, 2020
1 parent 4750f6f commit d8d2009
Showing 1 changed file with 31 additions and 23 deletions.
Expand Up @@ -72,7 +72,7 @@
public class KafkaWriterFilter implements IDatabaseWriterFilter {
protected final String KAFKA_TEXT_CACHE = "KAFKA_TEXT_CACHE" + this.hashCode();

protected Map<String, List<String>> kafkaDataMap = new HashMap<String, List<String>>();
protected Map<String, List<ProducerRecord<String, Object>>> kafkaDataMap = new HashMap<String, List<ProducerRecord<String, Object>>>();
protected String kafkaDataKey;

private final Logger log = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -131,7 +131,7 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter {
Map<String, String> tableNameCache = new HashMap<String, String>();
Map<String, Map<String, String>> tableColumnCache = new HashMap<String, Map<String, String>>();

public static KafkaProducer<?, ?> kafkaProducer;
public static KafkaProducer<String, Object> kafkaProducer;

public KafkaWriterFilter(IParameterService parameterService) {
schema = parser.parse(AVRO_CDC_SCHEMA);
Expand All @@ -150,7 +150,6 @@ public KafkaWriterFilter(IParameterService parameterService) {
this.schemaPackage = parameterService.getString(ParameterConstants.KAFKA_AVRO_JAVA_PACKAGE);

if (kafkaProducer == null) {

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Expand All @@ -169,7 +168,7 @@ public KafkaWriterFilter(IParameterService parameterService) {
configs.put(key.toString().substring(12), props.get(key));
}
}
kafkaProducer = new KafkaProducer(configs);
kafkaProducer = new KafkaProducer<String, Object>(configs);
this.log.debug("Kafka client config: {}", configs);
}
}
Expand All @@ -185,6 +184,19 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) {
}

StringBuffer kafkaText = new StringBuffer();
String kafkaKey = null;

if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) {
StringBuffer sb = new StringBuffer();
sb.append(table.getName()).append(":");
for (int i = 0; i < table.getPrimaryKeyColumnNames().length; i++) {
sb.append(":").append(rowData[i]);
}
kafkaKey = String.valueOf(sb.toString().hashCode());
} else if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) {
String s = context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();
kafkaKey = String.valueOf(s.hashCode());
}

if (topicBy.equals(KAFKA_TOPIC_BY_CHANNEL)) {
kafkaDataKey = context.getBatch().getChannelId();
Expand All @@ -195,9 +207,9 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) {
log.debug("Processing table {} for Kafka on topic {}", table, kafkaDataKey);

if (kafkaDataMap.get(kafkaDataKey) == null) {
kafkaDataMap.put(kafkaDataKey, new ArrayList<String>());
kafkaDataMap.put(kafkaDataKey, new ArrayList<ProducerRecord<String, Object>>());
}
List<String> kafkaDataList = kafkaDataMap.get(kafkaDataKey);
List<ProducerRecord<String, Object>> kafkaDataList = kafkaDataMap.get(kafkaDataKey);

if (outputFormat.equals(KAFKA_FORMAT_JSON)) {
kafkaText.append("{\"").append(table.getName()).append("\": {").append("\"eventType\": \"" + data.getDataEventType() + "\",")
Expand Down Expand Up @@ -266,7 +278,7 @@ else if (Long.class.equals(propertyTypeClass)) {
}
}
}
sendKafkaMessageByObject(pojo, kafkaDataKey);
sendKafkaMessage(new ProducerRecord<String, Object>(kafkaDataKey, kafkaKey, pojo));
} else {
throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName);
}
Expand Down Expand Up @@ -307,7 +319,7 @@ else if (Long.class.equals(propertyTypeClass)) {
}
}
}
kafkaDataList.add(kafkaText.toString());
kafkaDataList.add(new ProducerRecord<String, Object>(kafkaDataKey, kafkaKey, kafkaText.toString()));
}
return false;
}
Expand Down Expand Up @@ -436,21 +448,22 @@ public void batchComplete(DataContext context) {
try {
if (confluentUrl == null && kafkaDataMap.size() > 0) {
StringBuffer kafkaText = new StringBuffer();
String kafkaKey = null;


for (Map.Entry<String, List<String>> entry : kafkaDataMap.entrySet()) {
for (String row : entry.getValue()) {
for (Map.Entry<String, List<ProducerRecord<String, Object>>> entry : kafkaDataMap.entrySet()) {
for (ProducerRecord<String, Object> record : entry.getValue()) {
if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) {
sendKafkaMessage(row, entry.getKey());
sendKafkaMessage(record);
} else {
kafkaText.append(row);
kafkaKey = record.key();
kafkaText.append(record.value());
}
}
if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) {
sendKafkaMessage(kafkaText.toString(), entry.getKey());
sendKafkaMessage(new ProducerRecord<String, Object>(entry.getKey(), kafkaKey, kafkaText.toString()));
}
}
kafkaDataMap = new HashMap<String, List<String>>();
kafkaDataMap = new HashMap<String, List<ProducerRecord<String, Object>>>();
}
} catch (Exception e) {
log.warn("Unable to write batch to Kafka " + batchFileName, e);
Expand All @@ -469,14 +482,9 @@ public void batchCommitted(DataContext context) {
public void batchRolledback(DataContext context) {
}

public void sendKafkaMessage(String kafkaText, String topic) {
log.debug("Sending message (topic={}) {}", topic, kafkaText);
kafkaProducer.send(new ProducerRecord(topic, kafkaText));
}

public void sendKafkaMessageByObject(Object bean, String topic) {
log.debug("Sending object (topic={}) {}", topic, bean);
kafkaProducer.send(new ProducerRecord(topic, bean));
public void sendKafkaMessage(ProducerRecord<String, Object> record) {
log.debug("Sending message (topic={}) (key={}) {}", record.topic(), record.key(), record.value());
kafkaProducer.send(record);
}

public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException {
Expand Down

0 comments on commit d8d2009

Please sign in to comment.