Skip to content

Commit

Permalink
0004378: Kafka support for multiple sources
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed May 1, 2020
1 parent 892dc31 commit 4750f6f
Showing 1 changed file with 30 additions and 27 deletions.
Expand Up @@ -131,6 +131,8 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter {
Map<String, String> tableNameCache = new HashMap<String, String>();
Map<String, Map<String, String>> tableColumnCache = new HashMap<String, Map<String, String>>();

public static KafkaProducer<?, ?> kafkaProducer;

public KafkaWriterFilter(IParameterService parameterService) {
schema = parser.parse(AVRO_CDC_SCHEMA);
this.url = parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + "db.url");
Expand All @@ -147,24 +149,29 @@ public KafkaWriterFilter(IParameterService parameterService) {
this.confluentUrl = parameterService.getString(ParameterConstants.KAFKA_CONFLUENT_REGISTRY_URL);
this.schemaPackage = parameterService.getString(ParameterConstants.KAFKA_AVRO_JAVA_PACKAGE);

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer);

if (confluentUrl != null) {
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl);
}

TypedProperties props = parameterService.getAllParameters();
for (Object key : props.keySet()) {
if (key.toString().startsWith("kafkaclient.")) {
configs.put(key.toString().substring(12), props.get(key));
}
}
if (kafkaProducer == null) {

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer);

if (confluentUrl != null) {
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl);
}

TypedProperties props = parameterService.getAllParameters();
for (Object key : props.keySet()) {
if (key.toString().startsWith("kafkaclient.")) {
configs.put(key.toString().substring(12), props.get(key));
}
}
kafkaProducer = new KafkaProducer(configs);
this.log.debug("Kafka client config: {}", configs);
}
}

public boolean beforeWrite(DataContext context, Table table, CsvData data) {
Expand Down Expand Up @@ -426,7 +433,6 @@ public void batchComplete(DataContext context) {
String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();

log.debug("Kafka client config: {}", configs);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
try {
if (confluentUrl == null && kafkaDataMap.size() > 0) {
StringBuffer kafkaText = new StringBuffer();
Expand All @@ -435,13 +441,13 @@ public void batchComplete(DataContext context) {
for (Map.Entry<String, List<String>> entry : kafkaDataMap.entrySet()) {
for (String row : entry.getValue()) {
if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) {
sendKafkaMessage(producer, row, entry.getKey());
sendKafkaMessage(row, entry.getKey());
} else {
kafkaText.append(row);
}
}
if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) {
sendKafkaMessage(producer, kafkaText.toString(), entry.getKey());
sendKafkaMessage(kafkaText.toString(), entry.getKey());
}
}
kafkaDataMap = new HashMap<String, List<String>>();
Expand All @@ -450,7 +456,6 @@ public void batchComplete(DataContext context) {
log.warn("Unable to write batch to Kafka " + batchFileName, e);
e.printStackTrace();
} finally {
producer.close();
context.put(KAFKA_TEXT_CACHE, new HashMap<String, List<String>>());
tableNameCache.clear();
tableColumnCache = new HashMap<String, Map<String, String>>();
Expand All @@ -464,16 +469,14 @@ public void batchCommitted(DataContext context) {
public void batchRolledback(DataContext context) {
}

public void sendKafkaMessage(KafkaProducer<String, String> producer, String kafkaText, String topic) {
public void sendKafkaMessage(String kafkaText, String topic) {
log.debug("Sending message (topic={}) {}", topic, kafkaText);
producer.send(new ProducerRecord<String, String>(topic, kafkaText));
kafkaProducer.send(new ProducerRecord(topic, kafkaText));
}

public void sendKafkaMessageByObject(Object bean, String topic) {
log.debug("Sending object (topic={}) {}", topic, bean);
KafkaProducer<String, Object> producer = new KafkaProducer<String, Object>(configs);
producer.send(new ProducerRecord<String, Object>(topic, bean));
producer.close();
kafkaProducer.send(new ProducerRecord(topic, bean));
}

public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException {
Expand Down

0 comments on commit 4750f6f

Please sign in to comment.