Skip to content

Commit

Permalink
ATLAS-2751: Atlas is not consuming messages from ATLAS_HOOK topic aft…
Browse files Browse the repository at this point in the history
…er recovering from zookeeper connection timeout.
  • Loading branch information
sarathsubramanian committed Jun 10, 2018
1 parent ce5ffeb commit fff9463
Showing 1 changed file with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";

private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed.";

private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
Expand All @@ -67,6 +69,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
private final Long pollTimeOutMs;
private KafkaConsumer consumer;
private KafkaProducer producer;
private String consumerClosedErrorMsg;

// ----- Constructors ----------------------------------------------------

Expand All @@ -85,8 +88,9 @@ public KafkaNotification(Configuration applicationProperties) throws AtlasExcept

Configuration kafkaConf = ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);

properties = ConfigurationConverter.getProperties(kafkaConf);
pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000);
properties = ConfigurationConverter.getProperties(kafkaConf);
pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000);
consumerClosedErrorMsg = kafkaConf.getString("error.message.consumer_closed", DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE);

//Override default configs
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Expand Down Expand Up @@ -223,7 +227,7 @@ void sendInternalToProducer(Producer p, NotificationType type, List<String> mess


public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) {
if(this.consumer == null) {
if (consumer == null || !isKafkaConsumerOpen(consumer)) {
try {
String topic = TOPIC_MAP.get(type);

Expand Down Expand Up @@ -287,4 +291,19 @@ public String getMessage() {
return message;
}
}

// kafka-client doesn't have method to check if consumer is open, hence checking list topics and catching exception
private boolean isKafkaConsumerOpen(KafkaConsumer consumer) {
boolean ret = true;

try {
consumer.listTopics();
} catch (IllegalStateException ex) {
if (ex.getMessage().equalsIgnoreCase(consumerClosedErrorMsg)) {
ret = false;
}
}

return ret;
}
}

0 comments on commit fff9463

Please sign in to comment.