Skip to content

Commit

Permalink
0005457: Cannot write to more than one Kafka instance in a single
Browse files Browse the repository at this point in the history
instance of SymmetricDS
  • Loading branch information
jakobvanmeter committed Sep 15, 2022
1 parent b0f3500 commit 93065a0
Showing 1 changed file with 11 additions and 2 deletions.
Expand Up @@ -79,6 +79,7 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter {
private final Logger log = LoggerFactory.getLogger(getClass());
private String url;
private String producer;
private String externalNodeID;
private String outputFormat;
private String topicBy;
private String messageBy;
Expand Down Expand Up @@ -116,7 +117,8 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter {
Map<String, Class<?>> tableClassCache = new HashMap<String, Class<?>>();
Map<String, String> tableNameCache = new HashMap<String, String>();
Map<String, Map<String, String>> tableColumnCache = new HashMap<String, Map<String, String>>();
public static KafkaProducer<String, Object> kafkaProducer;
public KafkaProducer<String, Object> kafkaProducer;
public static Map<String,KafkaProducer<String, Object>> producerMap = new HashMap<String,KafkaProducer<String,Object>>();

public KafkaWriterFilter(IParameterService parameterService) {
schema = parser.parse(AVRO_CDC_SCHEMA);
Expand All @@ -132,7 +134,11 @@ public KafkaWriterFilter(IParameterService parameterService) {
this.messageBy = parameterService.getString(ParameterConstants.KAFKA_MESSAGE_BY, KAFKA_MESSAGE_BY_BATCH);
this.confluentUrl = parameterService.getString(ParameterConstants.KAFKA_CONFLUENT_REGISTRY_URL);
this.schemaPackage = parameterService.getString(ParameterConstants.KAFKA_AVRO_JAVA_PACKAGE);
if (kafkaProducer == null) {
this.externalNodeID=parameterService.getExternalId();
String clientID = this.producer +"-"+ this.externalNodeID;
if(producerMap.get(clientID) != null) {
kafkaProducer = producerMap.get(clientID);
} else {
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand All @@ -141,6 +147,8 @@ public KafkaWriterFilter(IParameterService parameterService) {
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl);

configs.put(ProducerConfig.CLIENT_ID_CONFIG, clientID);
}
TypedProperties props = parameterService.getAllParameters();
for (Object key : props.keySet()) {
Expand All @@ -149,6 +157,7 @@ public KafkaWriterFilter(IParameterService parameterService) {
}
}
kafkaProducer = new KafkaProducer<String, Object>(configs);
producerMap.put(clientID, kafkaProducer);
this.log.debug("Kafka client config: {}", configs);
}
}
Expand Down

0 comments on commit 93065a0

Please sign in to comment.