Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,17 @@ public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notifi
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) {
LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);

Properties consumerProperties = getConsumerProperties(notificationType);
AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties, notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs);
List<NotificationConsumer<T>> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
Properties consumerProperties = getConsumerProperties(notificationType);

List<NotificationConsumer<T>> consumers = Collections.singletonList(kafkaConsumer);
AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties, notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs);
consumers.add(kafkaConsumer);
}

LOG.info("<== KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);

return consumers;
return Collections.unmodifiableList(consumers);
}

@Override
Expand Down Expand Up @@ -231,20 +234,17 @@ void sendInternalToProducer(Producer p, NotificationType type, List<String> mess


public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) {
if (consumer == null || !isKafkaConsumerOpen(consumer)) {
try {
String topic = TOPIC_MAP.get(type);
try {
String topic = TOPIC_MAP.get(type);

consumerProperties.put("enable.auto.commit", autoCommitEnabled);
consumerProperties.put("enable.auto.commit", autoCommitEnabled);

this.consumer = new KafkaConsumer(consumerProperties);
this.consumer = new KafkaConsumer(consumerProperties);

this.consumer.subscribe(Arrays.asList(topic));
} catch (Exception ee) {
this.consumer.subscribe(Arrays.asList(topic));
} catch (Exception ee) {
LOG.error("Exception in getKafkaConsumer ", ee);
}
}

return this.consumer;
}

Expand Down