Skip to content

Commit

Permalink
CAMEL-18019: ensure some fields in the Kafka record fetcher are safe …
Browse files Browse the repository at this point in the history
…for concurrent read
  • Loading branch information
orpiske committed Apr 28, 2022
1 parent 9c3aea3 commit 31f3b04
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.camel.ConsumerListenerAware;
import org.apache.camel.Processor;
Expand Down Expand Up @@ -115,10 +116,6 @@ Properties getProps() {
return props;
}

List<KafkaFetchRecords> getTasks() {
return tasks;
}

@Override
protected void doStart() throws Exception {
LOG.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", endpoint.getConfiguration().getTopic(),
Expand Down Expand Up @@ -217,7 +214,7 @@ protected void doStop() throws Exception {
@Override
protected void doSuspend() throws Exception {
for (KafkaFetchRecords task : tasks) {
LOG.info("Pausing Kafka record fetcher task running client ID {}", task.getClientId());
LOG.info("Pausing Kafka record fetcher task running client ID {}", task.healthState().getClientId());
task.pause();
}

Expand All @@ -227,10 +224,14 @@ protected void doSuspend() throws Exception {
@Override
protected void doResume() throws Exception {
for (KafkaFetchRecords task : tasks) {
LOG.info("Resuming Kafka record fetcher task running client ID {}", task.getClientId());
LOG.info("Resuming Kafka record fetcher task running client ID {}", task.healthState().getClientId());
task.resume();
}

super.doResume();
}

public List<KafkaFetchRecords.TaskHealthState> healthStates() {
return tasks.stream().map(t -> t.healthState()).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.camel.health.HealthCheckResultBuilder;
import org.apache.camel.impl.health.AbstractHealthCheck;
import org.apache.camel.util.TimeUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;

/**
* Kafka consumer readiness health-check
Expand All @@ -47,32 +44,28 @@ public boolean isLiveness() {

@Override
protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) {
List<KafkaFetchRecords> tasks = kafkaConsumer.getTasks();
for (KafkaFetchRecords task : tasks) {
if (!task.isReady()) {
List<KafkaFetchRecords.TaskHealthState> healthStates = kafkaConsumer.healthStates();

for (KafkaFetchRecords.TaskHealthState healthState : healthStates) {
if (!healthState.isReady()) {
builder.down();

String msg = "KafkaConsumer is not ready";
if (task.isTerminated()) {
msg += " (gave up recovering and terminated the kafka consumer; restart route or application to recover).";
} else if (task.isRecoverable()) {
String time = TimeUtils.printDuration(task.getCurrentRecoveryInterval());
msg += " (recovery in progress using " + time + " intervals).";
}
final String msg = healthState.buildStateMessage();

builder.message(msg);

// was this caused by consumer not able to connect then this is stored in last error
builder.error(task.getLastError());
builder.error(healthState.getLastError());

KafkaConfiguration cfg = kafkaConsumer.getEndpoint().getConfiguration();
Properties props = task.getKafkaProps();

builder.detail("bootstrap.servers", props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
builder.detail("client.id", task.getClientId());
String gid = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
builder.detail("bootstrap.servers", healthState.getBootstrapServers());
builder.detail("client.id", healthState.getClientId());
String gid = healthState.getGroupId();
if (gid != null) {
builder.detail("group.id", gid);
}

if (routeId != null) {
// camel route id
builder.detail("route.id", routeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.TimeUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.errors.InterruptException;
Expand All @@ -63,11 +64,89 @@ private enum State {
RESUME_REQUESTED,
}

/*
This one is used to avoid exposing methods from this class to the health checker. Fields and methods used
used to build an instance of this class, must be made thread-safe (i.e.: most importantly, read fields
should be marked as volatile).
*/
public static class TaskHealthState {
private final boolean ready;
private final boolean isTerminated;
private final boolean isRecoverable;
private final Exception lastError;
private final String clientId;

private final String bootstrapServers;

private final long currentBackoffInterval;

private final Properties clientProperties;

public TaskHealthState(boolean ready, boolean isTerminated, boolean isRecoverable, Exception lastError, String clientId,
long currentBackoffInterval, Properties clientProperties) {
this.ready = ready;
this.isTerminated = isTerminated;
this.isRecoverable = isRecoverable;
this.lastError = lastError;
this.clientId = clientId;
this.currentBackoffInterval = currentBackoffInterval;
this.clientProperties = clientProperties;
this.bootstrapServers = clientProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}

public boolean isReady() {
return ready;
}

public boolean isTerminated() {
return isTerminated;
}

public boolean isRecoverable() {
return isRecoverable;
}

public Exception getLastError() {
return lastError;
}

public String getClientId() {
return clientId;
}

public long getCurrentBackoffInterval() {
return currentBackoffInterval;
}

public String getBootstrapServers() {
return bootstrapServers;
}

public String getGroupId() {
return clientProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
}

public String buildStateMessage() {
String msg = "KafkaConsumer is not ready";
if (isTerminated()) {
msg += " (gave up recovering and terminated the kafka consumer; restart route or application to recover).";
} else if (isRecoverable()) {
String time = TimeUtils.printDuration(getCurrentBackoffInterval());
msg += " (recovery in progress using " + time + " intervals).";
}

return msg;
}
}

private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);

// There are a few of volatile fields here because they are usually read from other threads,
// like from the health check thread. They are usually only read on those contexts.

private final KafkaConsumer kafkaConsumer;
private org.apache.kafka.clients.consumer.Consumer consumer;
private String clientId;
private volatile String clientId;
private final String topicName;
private final Pattern topicPattern;
private final String threadId;
Expand All @@ -77,13 +156,14 @@ private enum State {
private final BridgeExceptionHandlerToErrorHandler bridge;
private final ReentrantLock lock = new ReentrantLock();
private CommitManager commitManager;
private Exception lastError;
private volatile Exception lastError;
private final KafkaConsumerListener consumerListener;

private boolean terminated;
private long currentBackoffInterval;
private boolean reconnect; // must be false at init (this is the policy whether to reconnect)
private boolean connected; // this is the state (connected or not)
private volatile boolean terminated;
private volatile long currentBackoffInterval;

private volatile boolean reconnect; // The reconnect must be false at init (this is the policy whether to reconnect).
private volatile boolean connected; // this is the state (connected or not)

private volatile State state = State.RUNNING;

Expand Down Expand Up @@ -490,7 +570,7 @@ public void setConnected(boolean connected) {
this.connected = connected;
}

public boolean isReady() {
private boolean isReady() {
if (!connected) {
return false;
}
Expand All @@ -516,28 +596,23 @@ public boolean isReady() {
return ready;
}

Properties getKafkaProps() {
private Properties getKafkaProps() {
return kafkaProps;
}

String getClientId() {
return clientId;
}

Exception getLastError() {
return lastError;
}

boolean isTerminated() {
private boolean isTerminated() {
return terminated;
}

boolean isRecoverable() {
private boolean isRecoverable() {
return (pollExceptionStrategy.canContinue() || isReconnect()) && isKafkaConsumerRunnable();
}

long getCurrentRecoveryInterval() {
return currentBackoffInterval;
// concurrent access happens here
public TaskHealthState healthState() {
return new TaskHealthState(
isReady(), isTerminated(), isRecoverable(), lastError, clientId,
currentBackoffInterval, kafkaProps);
}

public BridgeExceptionHandlerToErrorHandler getBridge() {
Expand All @@ -561,4 +636,5 @@ public void resume() {
LOG.info("A resume request was issued and the consumer thread will resume after current processing has finished");
state = State.RESUME_REQUESTED;
}

}

0 comments on commit 31f3b04

Please sign in to comment.