Skip to content

Commit

Permalink
MINOR: Ensure consumer logging has clientId/groupId context
Browse files Browse the repository at this point in the history
This patch ensures that the consumer groupId and clientId are available in all log messages which makes debugging much easier when a single application has multiple consumer instances. To make this easier, I've added a new `LogContext` object which builds a log prefix similar to the broker-side `kafka.utils.Logging` mixin. Additionally this patch changes the log level for a couple minor cases:

- Consumer wakeup events are now logged at DEBUG instead of TRACE
- Heartbeat enabling/disabling is now logged at DEBUG instead of TRACE

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3676 from hachikuji/log-consumer-wakeups
  • Loading branch information
hachikuji committed Aug 19, 2017
1 parent ed96523 commit 6896f1d
Show file tree
Hide file tree
Showing 15 changed files with 628 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Collection;
Expand Down Expand Up @@ -541,7 +541,6 @@
*/
public class KafkaConsumer<K, V> implements Consumer<K, V> {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.consumer";
Expand All @@ -550,6 +549,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// Visible for testing
final Metrics metrics;

private final Logger log;
private final String clientId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
Expand Down Expand Up @@ -640,18 +640,23 @@ private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
log.debug("Starting the Kafka consumer");
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);

LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
this.log = logContext.logger(getClass());

log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = Time.SYSTEM;

String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -712,31 +717,39 @@ private KafkaConsumer(ConsumerConfig config,
true,
new ApiVersions(),
throttleTimeSensor);
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
this.client = new ConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
this.coordinator = new ConsumerCoordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
this.fetcher = new Fetcher<>(this.client,
this.coordinator = new ConsumerCoordinator(logContext,
this.client,
groupId,
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
this.fetcher = new Fetcher<>(
logContext,
this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
Expand All @@ -756,7 +769,7 @@ private KafkaConsumer(ConsumerConfig config,
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);

log.debug("Kafka consumer with client id {} created", clientId);
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
Expand All @@ -767,7 +780,8 @@ private KafkaConsumer(ConsumerConfig config,
}

// visible for testing
KafkaConsumer(String clientId,
KafkaConsumer(LogContext logContext,
String clientId,
ConsumerCoordinator coordinator,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Expand All @@ -780,6 +794,7 @@ private KafkaConsumer(ConsumerConfig config,
Metadata metadata,
long retryBackoffMs,
long requestTimeoutMs) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
this.coordinator = coordinator;
this.keyDeserializer = keyDeserializer;
Expand Down Expand Up @@ -1242,7 +1257,7 @@ public void commitAsync(OffsetCommitCallback callback) {
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
log.debug("Committing offsets: {} ", offsets);
log.debug("Committing offsets: {}", offsets);
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
} finally {
release();
Expand Down Expand Up @@ -1440,8 +1455,8 @@ public Map<String, List<PartitionInfo>> listTopics() {
public void pause(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
log.debug("Pausing partitions {}", partitions);
for (TopicPartition partition: partitions) {
log.debug("Pausing partition {}", partition);
subscriptions.pause(partition);
}
} finally {
Expand All @@ -1459,8 +1474,8 @@ public void pause(Collection<TopicPartition> partitions) {
public void resume(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
log.debug("Resuming partitions {}", partitions);
for (TopicPartition partition: partitions) {
log.debug("Resuming partition {}", partition);
subscriptions.resume(partition);
}
} finally {
Expand Down Expand Up @@ -1630,7 +1645,7 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer<
}

private void close(long timeoutMs, boolean swallowException) {
log.trace("Closing the Kafka consumer.");
log.trace("Closing the Kafka consumer");
AtomicReference<Throwable> firstException = new AtomicReference<>();
try {
if (coordinator != null)
Expand All @@ -1646,7 +1661,7 @@ private void close(long timeoutMs, boolean swallowException) {
ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka consumer with client id {} has been closed", clientId);
log.debug("Kafka consumer has been closed");
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
if (exception instanceof InterruptException) {
Expand Down

0 comments on commit 6896f1d

Please sign in to comment.