diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 8b65d3fae04ee..12461d86929fe 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.camel.ConsumerListenerAware; import org.apache.camel.Processor; @@ -115,10 +116,6 @@ Properties getProps() { return props; } - List getTasks() { - return tasks; - } - @Override protected void doStart() throws Exception { LOG.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", endpoint.getConfiguration().getTopic(), @@ -217,7 +214,7 @@ protected void doStop() throws Exception { @Override protected void doSuspend() throws Exception { for (KafkaFetchRecords task : tasks) { - LOG.info("Pausing Kafka record fetcher task running client ID {}", task.getClientId()); + LOG.info("Pausing Kafka record fetcher task running client ID {}", task.healthState().getClientId()); task.pause(); } @@ -227,10 +224,14 @@ protected void doSuspend() throws Exception { @Override protected void doResume() throws Exception { for (KafkaFetchRecords task : tasks) { - LOG.info("Resuming Kafka record fetcher task running client ID {}", task.getClientId()); + LOG.info("Resuming Kafka record fetcher task running client ID {}", task.healthState().getClientId()); task.resume(); } super.doResume(); } + + public List healthStates() { + return tasks.stream().map(t -> t.healthState()).collect(Collectors.toList()); + } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java index 242915666b9d3..9d7698bddec9c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java @@ -18,12 +18,9 @@ import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.camel.health.HealthCheckResultBuilder; import org.apache.camel.impl.health.AbstractHealthCheck; -import org.apache.camel.util.TimeUtils; -import org.apache.kafka.clients.consumer.ConsumerConfig; /** * Kafka consumer readiness health-check @@ -47,32 +44,28 @@ public boolean isLiveness() { @Override protected void doCall(HealthCheckResultBuilder builder, Map options) { - List tasks = kafkaConsumer.getTasks(); - for (KafkaFetchRecords task : tasks) { - if (!task.isReady()) { + List healthStates = kafkaConsumer.healthStates(); + + for (KafkaFetchRecords.TaskHealthState healthState : healthStates) { + if (!healthState.isReady()) { builder.down(); - String msg = "KafkaConsumer is not ready"; - if (task.isTerminated()) { - msg += " (gave up recovering and terminated the kafka consumer; restart route or application to recover)."; - } else if (task.isRecoverable()) { - String time = TimeUtils.printDuration(task.getCurrentRecoveryInterval()); - msg += " (recovery in progress using " + time + " intervals)."; - } + final String msg = healthState.buildStateMessage(); + builder.message(msg); // was this caused by consumer not able to connect then this is stored in last error - builder.error(task.getLastError()); + builder.error(healthState.getLastError()); KafkaConfiguration cfg = kafkaConsumer.getEndpoint().getConfiguration(); - Properties props = task.getKafkaProps(); - builder.detail("bootstrap.servers", props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - builder.detail("client.id", task.getClientId()); - String gid = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG); + builder.detail("bootstrap.servers", healthState.getBootstrapServers()); + builder.detail("client.id", healthState.getClientId()); + String gid = healthState.getGroupId(); if (gid != null) { builder.detail("group.id", gid); } + if (routeId != null) { // camel route id builder.detail("route.id", routeId); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 1e5a4be540320..d367dd5343daa 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -42,6 +42,7 @@ import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.TimeUtils; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.common.errors.InterruptException; @@ -63,11 +64,89 @@ private enum State { RESUME_REQUESTED, } + /* + This one is used to avoid exposing methods from this class to the health checker. Fields and methods used + used to build an instance of this class, must be made thread-safe (i.e.: most importantly, read fields + should be marked as volatile). + */ + public static class TaskHealthState { + private final boolean ready; + private final boolean isTerminated; + private final boolean isRecoverable; + private final Exception lastError; + private final String clientId; + + private final String bootstrapServers; + + private final long currentBackoffInterval; + + private final Properties clientProperties; + + public TaskHealthState(boolean ready, boolean isTerminated, boolean isRecoverable, Exception lastError, String clientId, + long currentBackoffInterval, Properties clientProperties) { + this.ready = ready; + this.isTerminated = isTerminated; + this.isRecoverable = isRecoverable; + this.lastError = lastError; + this.clientId = clientId; + this.currentBackoffInterval = currentBackoffInterval; + this.clientProperties = clientProperties; + this.bootstrapServers = clientProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + } + + public boolean isReady() { + return ready; + } + + public boolean isTerminated() { + return isTerminated; + } + + public boolean isRecoverable() { + return isRecoverable; + } + + public Exception getLastError() { + return lastError; + } + + public String getClientId() { + return clientId; + } + + public long getCurrentBackoffInterval() { + return currentBackoffInterval; + } + + public String getBootstrapServers() { + return bootstrapServers; + } + + public String getGroupId() { + return clientProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG); + } + + public String buildStateMessage() { + String msg = "KafkaConsumer is not ready"; + if (isTerminated()) { + msg += " (gave up recovering and terminated the kafka consumer; restart route or application to recover)."; + } else if (isRecoverable()) { + String time = TimeUtils.printDuration(getCurrentBackoffInterval()); + msg += " (recovery in progress using " + time + " intervals)."; + } + + return msg; + } + } + private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class); + // There are a few of volatile fields here because they are usually read from other threads, + // like from the health check thread. They are usually only read on those contexts. + private final KafkaConsumer kafkaConsumer; private org.apache.kafka.clients.consumer.Consumer consumer; - private String clientId; + private volatile String clientId; private final String topicName; private final Pattern topicPattern; private final String threadId; @@ -77,13 +156,14 @@ private enum State { private final BridgeExceptionHandlerToErrorHandler bridge; private final ReentrantLock lock = new ReentrantLock(); private CommitManager commitManager; - private Exception lastError; + private volatile Exception lastError; private final KafkaConsumerListener consumerListener; - private boolean terminated; - private long currentBackoffInterval; - private boolean reconnect; // must be false at init (this is the policy whether to reconnect) - private boolean connected; // this is the state (connected or not) + private volatile boolean terminated; + private volatile long currentBackoffInterval; + + private volatile boolean reconnect; // The reconnect must be false at init (this is the policy whether to reconnect). + private volatile boolean connected; // this is the state (connected or not) private volatile State state = State.RUNNING; @@ -490,7 +570,7 @@ public void setConnected(boolean connected) { this.connected = connected; } - public boolean isReady() { + private boolean isReady() { if (!connected) { return false; } @@ -516,28 +596,23 @@ public boolean isReady() { return ready; } - Properties getKafkaProps() { + private Properties getKafkaProps() { return kafkaProps; } - String getClientId() { - return clientId; - } - - Exception getLastError() { - return lastError; - } - - boolean isTerminated() { + private boolean isTerminated() { return terminated; } - boolean isRecoverable() { + private boolean isRecoverable() { return (pollExceptionStrategy.canContinue() || isReconnect()) && isKafkaConsumerRunnable(); } - long getCurrentRecoveryInterval() { - return currentBackoffInterval; + // concurrent access happens here + public TaskHealthState healthState() { + return new TaskHealthState( + isReady(), isTerminated(), isRecoverable(), lastError, clientId, + currentBackoffInterval, kafkaProps); } public BridgeExceptionHandlerToErrorHandler getBridge() { @@ -561,4 +636,5 @@ public void resume() { LOG.info("A resume request was issued and the consumer thread will resume after current processing has finished"); state = State.RESUME_REQUESTED; } + }