Skip to content

Commit

Permalink
KAFKA-3888: send consumer heartbeats from a background thread (KIP-62)
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>

Closes #1627 from hachikuji/KAFKA-3888
  • Loading branch information
hachikuji authored and guozhangwang committed Aug 17, 2016
1 parent 19997ed commit 40b1dd3
Show file tree
Hide file tree
Showing 43 changed files with 1,686 additions and 1,281 deletions.
Expand Up @@ -28,7 +28,12 @@ public class CommitFailedException extends KafkaException {

private static final long serialVersionUID = 1L;

public CommitFailedException(String message) {
super(message);
public CommitFailedException() {
super("Commit cannot be completed since the group has already " +
"rebalanced and assigned the partitions to another member. This means that the time " +
"between subsequent calls to poll() was longer than the configured max.poll.interval.ms, " +
"which typically implies that the poll loop is spending too much time message processing. " +
"You can address this either by increasing the session timeout or by reducing the maximum " +
"size of batches returned in poll() with max.poll.records.");
}
}
Expand Up @@ -48,24 +48,33 @@ public class ConsumerConfig extends AbstractConfig {
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";

/** <code>max.poll.interval.ms</code> */
public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
private static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " +
"consumer group management. This places an upper bound on the amount of time that the consumer can be idle " +
"before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " +
"is considered failed and the group will rebalance in order to reassign the partitions to another member. ";

/**
* <code>session.timeout.ms</code>
*/
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's " +
"group management facilities. When a consumer's heartbeat is not received within the session timeout, " +
"the broker will mark the consumer as failed and rebalance the group. Since heartbeats are sent only " +
"when poll() is invoked, a higher session timeout allows more time for message processing in the consumer's " +
"poll loop at the cost of a longer time to detect hard failures. See also <code>" + MAX_POLL_RECORDS_CONFIG + "</code> for " +
"another option to control the processing time in the poll loop. Note that the value must be in the " +
"allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect consumer failures when using " +
"Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness " +
"to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " +
"then the broker will remove this consumer from the group and initiate a rebalance. Note that the value " +
"must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
"and <code>group.max.session.timeout.ms</code>.";

/**
* <code>heartbeat.interval.ms</code>
*/
public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " +
"coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
"consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " +
"The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " +
"than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";

/**
* <code>bootstrap.servers</code>
Expand Down Expand Up @@ -196,7 +205,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
30000,
10000,
Importance.HIGH,
SESSION_TIMEOUT_MS_DOC)
.define(HEARTBEAT_INTERVAL_MS_CONFIG,
Expand All @@ -221,7 +230,7 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
ENABLE_AUTO_COMMIT_DOC)
.define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
Type.LONG,
Type.INT,
5000,
atLeast(0),
Importance.LOW,
Expand Down Expand Up @@ -311,7 +320,7 @@ public class ConsumerConfig extends AbstractConfig {
VALUE_DESERIALIZER_CLASS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
40 * 1000,
305000, // chosen to be higher than the default of max.poll.interval.ms
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
Expand All @@ -328,10 +337,16 @@ public class ConsumerConfig extends AbstractConfig {
INTERCEPTOR_CLASSES_DOC)
.define(MAX_POLL_RECORDS_CONFIG,
Type.INT,
Integer.MAX_VALUE,
500,
atLeast(1),
Importance.MEDIUM,
MAX_POLL_RECORDS_DOC)
.define(MAX_POLL_INTERVAL_MS_CONFIG,
Type.INT,
300000,
atLeast(1),
Importance.MEDIUM,
MAX_POLL_INTERVAL_MS_DOC)
.define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
Type.BOOLEAN,
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
Expand Down
Expand Up @@ -137,32 +137,31 @@
* After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is
* invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
* will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
* the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown),
* then no heartbeats will be sent. If a period of the configured <i>session timeout</i> elapses before the server
* has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned.
* This is designed to prevent situations where the consumer has failed, yet continues to hold onto the partitions
* it was assigned (thus preventing active consumers in the group from taking them). To stay in the group, you
* have to prove you are still alive by calling poll.
* the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for
* a duration of <code>session.timeout.ms</code>, then the consumer will be considered dead and its partitions will
* be reassigned. It is also possible that the consumer could encounter a "livelock" situation where it is continuing
* to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions
* indefinitely in this case, we provide a liveness detection mechanism: basically if you don't call poll at least
* as frequently as the configured <code>poll.interval.ms</code>, then the client will proactively leave the group
* so that another consumer can take over its partitions. So to stay in the group, you must continue to call poll
* <p>
* The implication of this design is that message processing time in the poll loop must be bounded so that
* heartbeats can be sent before expiration of the session timeout. What typically happens when processing time
* exceeds the session timeout is that the consumer won't be able to commit offsets for any of the processed records.
* For example, this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}. This
* guarantees that only active members of the group are allowed to commit offsets. If the consumer
* has been kicked out of the group, then its partitions will have been assigned to another member, which will be
* committing its own offsets as it handles new records. This gives offset commits an isolation guarantee.
* you always ensure that poll() is called at least once every poll interval. If not, then the consumer leaves
* the group, which typically results in an offset commit failure when the processing of the polled records
* finally completes (this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}).
* This is a safety mechanism which guarantees that only active members of the group are able to commit offsets.
* If the consumer has been kicked out of the group, then its partitions will have been assigned to another member,
* which will be committing its own offsets as it handles new records. This gives offset commits an isolation guarantee.
* <p>
* The consumer provides two configuration settings to control this behavior:
* The consumer provides two configuration settings to control the behavior of the poll loop:
* <ol>
* <li><code>session.timeout.ms</code>: By increasing the session timeout, you can give the consumer more
* time to handle a batch of records returned from {@link #poll(long)}. The only drawback is that it
* will take longer for the server to detect hard consumer failures, which can cause a delay before
* a rebalance can be completed. However, clean shutdown with {@link #close()} is not impacted since
* the consumer will send an explicit message to the server to leave the group and cause an immediate
* rebalance.</li>
* <li><code>max.poll.records</code>: Processing time in the poll loop is typically proportional to the number
* of records processed, so it's natural to want to set a limit on the number of records handled at once.
* This setting provides that. By default, there is essentially no limit.</li>
* <li><code>max.poll.interval.ms</code>: By increasing the interval between expected polls, you can give
* the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback
* is that increasing this value may delay a group rebalance since the consumer will only join the rebalance
* inside the call to poll.</li>
* <li><code>max.poll.records</code>: Use this setting to limit the total records returned from a single
* call to poll. This can make it easier to predict the maximum that must be handled within each poll
* interval.</li>
* </ol>
* <p>
* For use cases where message processing time varies unpredictably, neither of these options may be viable.
Expand All @@ -187,7 +186,6 @@
* props.put(&quot;group.id&quot;, &quot;test&quot;);
* props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);
* props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
* props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
* props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
Expand All @@ -210,13 +208,6 @@
* In this example the client is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers
* called <i>test</i> as described above.
* <p>
* The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
* consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. Note that
* the consumer is single-threaded, so periodic heartbeats can only be sent when {@link #poll(long)} is called. As long as
* the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
* to it. If it stops heartbeating by failing to call {@link #poll(long)} for a period of time longer than <code>session.timeout.ms</code>
* then it will be considered dead and its partitions will be assigned to another process.
* <p>
* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
* are saying that our record's key and value will just be simple strings.
*
Expand All @@ -242,7 +233,6 @@
* props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
* props.put(&quot;group.id&quot;, &quot;test&quot;);
* props.put(&quot;enable.auto.commit&quot;, &quot;false&quot;);
* props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
* props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
Expand Down Expand Up @@ -645,6 +635,7 @@ private KafkaConsumer(ConsumerConfig config,
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
this.coordinator = new ConsumerCoordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
Expand All @@ -656,7 +647,7 @@ private KafkaConsumer(ConsumerConfig config,
retryBackoffMs,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
if (keyDeserializer == null) {
Expand Down Expand Up @@ -715,6 +706,9 @@ private KafkaConsumer(ConsumerConfig config,
Metrics metrics,
SubscriptionState subscriptions,
Metadata metadata,
boolean autoCommitEnabled,
int autoCommitIntervalMs,
int heartbeatIntervalMs,
long retryBackoffMs,
long requestTimeoutMs) {
this.clientId = clientId;
Expand Down Expand Up @@ -970,7 +964,6 @@ public ConsumerRecords<K, V> poll(long timeout) {
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
// Additionally, pollNoWakeup does not allow automatic commits to get triggered.
fetcher.sendFetches();
client.pollNoWakeup();

Expand All @@ -997,30 +990,23 @@ public ConsumerRecords<K, V> poll(long timeout) {
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
coordinator.poll(time.milliseconds());

// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());

long now = time.milliseconds();

// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
client.executeDelayedTasks(now);

// init any new fetches (won't resend pending fetches)
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();

// if data is available already, e.g. from a previous network client poll() call to commit,
// then just return it immediately
if (!records.isEmpty())
return records;

// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
client.poll(timeout, now);

long now = time.milliseconds();
client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now);
return fetcher.fetchedRecords();
}

Expand Down

0 comments on commit 40b1dd3

Please sign in to comment.