diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
index 26ef48e0b410..5695be83d6cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -28,7 +28,12 @@ public class CommitFailedException extends KafkaException {
private static final long serialVersionUID = 1L;
- public CommitFailedException(String message) {
- super(message);
+ public CommitFailedException() {
+ super("Commit cannot be completed since the group has already " +
+ "rebalanced and assigned the partitions to another member. This means that the time " +
+ "between subsequent calls to poll() was longer than the configured max.poll.interval.ms, " +
+ "which typically implies that the poll loop is spending too much time message processing. " +
+ "You can address this either by increasing the session timeout or by reducing the maximum " +
+ "size of batches returned in poll() with max.poll.records.");
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index de10bed20aed..509c3a1d7fac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -48,24 +48,33 @@ public class ConsumerConfig extends AbstractConfig {
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
+ /** max.poll.interval.ms
*/
+ public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
+ private static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " +
+ "consumer group management. This places an upper bound on the amount of time that the consumer can be idle " +
+ "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " +
+ "is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
+
/**
* session.timeout.ms
*/
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
- private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's " +
- "group management facilities. When a consumer's heartbeat is not received within the session timeout, " +
- "the broker will mark the consumer as failed and rebalance the group. Since heartbeats are sent only " +
- "when poll() is invoked, a higher session timeout allows more time for message processing in the consumer's " +
- "poll loop at the cost of a longer time to detect hard failures. See also " + MAX_POLL_RECORDS_CONFIG + "
for " +
- "another option to control the processing time in the poll loop. Note that the value must be in the " +
- "allowable range as configured in the broker configuration by group.min.session.timeout.ms
" +
+ private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect consumer failures when using " +
+ "Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness " +
+ "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " +
+ "then the broker will remove this consumer from the group and initiate a rebalance. Note that the value " +
+ "must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms
" +
"and group.max.session.timeout.ms
.";
/**
* heartbeat.interval.ms
*/
public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
- private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms
, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+ private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " +
+ "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
+ "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " +
+ "The value must be set lower than session.timeout.ms
, but typically should be set no higher " +
+ "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
/**
* bootstrap.servers
@@ -196,7 +205,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
- 30000,
+ 10000,
Importance.HIGH,
SESSION_TIMEOUT_MS_DOC)
.define(HEARTBEAT_INTERVAL_MS_CONFIG,
@@ -221,7 +230,7 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
ENABLE_AUTO_COMMIT_DOC)
.define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
- Type.LONG,
+ Type.INT,
5000,
atLeast(0),
Importance.LOW,
@@ -311,7 +320,7 @@ public class ConsumerConfig extends AbstractConfig {
VALUE_DESERIALIZER_CLASS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
- 40 * 1000,
+ 305000, // chosen to be higher than the default of max.poll.interval.ms
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
@@ -328,10 +337,16 @@ public class ConsumerConfig extends AbstractConfig {
INTERCEPTOR_CLASSES_DOC)
.define(MAX_POLL_RECORDS_CONFIG,
Type.INT,
- Integer.MAX_VALUE,
+ 500,
atLeast(1),
Importance.MEDIUM,
MAX_POLL_RECORDS_DOC)
+ .define(MAX_POLL_INTERVAL_MS_CONFIG,
+ Type.INT,
+ 300000,
+ atLeast(1),
+ Importance.MEDIUM,
+ MAX_POLL_INTERVAL_MS_DOC)
.define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
Type.BOOLEAN,
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 522cfdee7bb3..ef913027361b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -137,32 +137,31 @@
* After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is
* invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
* will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
- * the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown),
- * then no heartbeats will be sent. If a period of the configured session timeout elapses before the server
- * has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned.
- * This is designed to prevent situations where the consumer has failed, yet continues to hold onto the partitions
- * it was assigned (thus preventing active consumers in the group from taking them). To stay in the group, you
- * have to prove you are still alive by calling poll.
+ * the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for
+ * a duration of session.timeout.ms
, then the consumer will be considered dead and its partitions will
+ * be reassigned. It is also possible that the consumer could encounter a "livelock" situation where it is continuing
+ * to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions
+ * indefinitely in this case, we provide a liveness detection mechanism: basically if you don't call poll at least
+ * as frequently as the configured poll.interval.ms
, then the client will proactively leave the group
+ * so that another consumer can take over its partitions. So to stay in the group, you must continue to call poll
*
* The implication of this design is that message processing time in the poll loop must be bounded so that
- * heartbeats can be sent before expiration of the session timeout. What typically happens when processing time
- * exceeds the session timeout is that the consumer won't be able to commit offsets for any of the processed records.
- * For example, this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}. This
- * guarantees that only active members of the group are allowed to commit offsets. If the consumer
- * has been kicked out of the group, then its partitions will have been assigned to another member, which will be
- * committing its own offsets as it handles new records. This gives offset commits an isolation guarantee.
+ * you always ensure that poll() is called at least once every poll interval. If not, then the consumer leaves
+ * the group, which typically results in an offset commit failure when the processing of the polled records
+ * finally completes (this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}).
+ * This is a safety mechanism which guarantees that only active members of the group are able to commit offsets.
+ * If the consumer has been kicked out of the group, then its partitions will have been assigned to another member,
+ * which will be committing its own offsets as it handles new records. This gives offset commits an isolation guarantee.
*
- * The consumer provides two configuration settings to control this behavior:
+ * The consumer provides two configuration settings to control the behavior of the poll loop:
*
- * session.timeout.ms
: By increasing the session timeout, you can give the consumer more
- * time to handle a batch of records returned from {@link #poll(long)}. The only drawback is that it
- * will take longer for the server to detect hard consumer failures, which can cause a delay before
- * a rebalance can be completed. However, clean shutdown with {@link #close()} is not impacted since
- * the consumer will send an explicit message to the server to leave the group and cause an immediate
- * rebalance.
- * max.poll.records
: Processing time in the poll loop is typically proportional to the number
- * of records processed, so it's natural to want to set a limit on the number of records handled at once.
- * This setting provides that. By default, there is essentially no limit.
+ * max.poll.interval.ms
: By increasing the interval between expected polls, you can give
+ * the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback
+ * is that increasing this value may delay a group rebalance since the consumer will only join the rebalance
+ * inside the call to poll.
+ * max.poll.records
: Use this setting to limit the total records returned from a single
+ * call to poll. This can make it easier to predict the maximum that must be handled within each poll
+ * interval.
*
*
* For use cases where message processing time varies unpredictably, neither of these options may be viable.
@@ -187,7 +186,6 @@
* props.put("group.id", "test");
* props.put("enable.auto.commit", "true");
* props.put("auto.commit.interval.ms", "1000");
- * props.put("session.timeout.ms", "30000");
* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
@@ -210,13 +208,6 @@
* In this example the client is subscribing to the topics foo and bar as part of a group of consumers
* called test as described above.
*
- * The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The
- * consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. Note that
- * the consumer is single-threaded, so periodic heartbeats can only be sent when {@link #poll(long)} is called. As long as
- * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
- * to it. If it stops heartbeating by failing to call {@link #poll(long)} for a period of time longer than session.timeout.ms
- * then it will be considered dead and its partitions will be assigned to another process.
- *
* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
* are saying that our record's key and value will just be simple strings.
*
@@ -242,7 +233,6 @@
* props.put("bootstrap.servers", "localhost:9092");
* props.put("group.id", "test");
* props.put("enable.auto.commit", "false");
- * props.put("session.timeout.ms", "30000");
* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
@@ -645,6 +635,7 @@ private KafkaConsumer(ConsumerConfig config,
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
this.coordinator = new ConsumerCoordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
+ config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
@@ -656,7 +647,7 @@ private KafkaConsumer(ConsumerConfig config,
retryBackoffMs,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
- config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+ config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
if (keyDeserializer == null) {
@@ -715,6 +706,9 @@ private KafkaConsumer(ConsumerConfig config,
Metrics metrics,
SubscriptionState subscriptions,
Metadata metadata,
+ boolean autoCommitEnabled,
+ int autoCommitIntervalMs,
+ int heartbeatIntervalMs,
long retryBackoffMs,
long requestTimeoutMs) {
this.clientId = clientId;
@@ -970,7 +964,6 @@ public ConsumerRecords poll(long timeout) {
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
- // Additionally, pollNoWakeup does not allow automatic commits to get triggered.
fetcher.sendFetches();
client.pollNoWakeup();
@@ -997,30 +990,23 @@ public ConsumerRecords poll(long timeout) {
* @return The fetched records (may be empty)
*/
private Map>> pollOnce(long timeout) {
- // ensure we have partitions assigned if we expect to
- if (subscriptions.partitionsAutoAssigned())
- coordinator.ensurePartitionAssignment();
+ coordinator.poll(time.milliseconds());
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
- long now = time.milliseconds();
-
- // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
- client.executeDelayedTasks(now);
-
- // init any new fetches (won't resend pending fetches)
+ // if data is available already, return it immediately
Map>> records = fetcher.fetchedRecords();
-
- // if data is available already, e.g. from a previous network client poll() call to commit,
- // then just return it immediately
if (!records.isEmpty())
return records;
+ // send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
- client.poll(timeout, now);
+
+ long now = time.milliseconds();
+ client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now);
return fetcher.fetchedRecords();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index e957856536e6..690df2600d55 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -20,6 +20,7 @@
import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -53,6 +54,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
/**
* AbstractCoordinator implements group management for a single group member by interacting with
@@ -77,26 +79,38 @@
* by the leader in {@link #performAssignment(String, String, Map)} and becomes available to members in
* {@link #onJoinComplete(int, String, String, ByteBuffer)}.
*
+ * Note on locking: this class shares state between the caller and a background thread which is
+ * used for sending heartbeats after the client has joined the group. All mutable state as well as
+ * state transitions are protected with the class's monitor. Generally this means acquiring the lock
+ * before reading or writing the state of the group (e.g. generation, memberId) and holding the lock
+ * when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).
*/
public abstract class AbstractCoordinator implements Closeable {
private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
- private final Heartbeat heartbeat;
- private final HeartbeatTask heartbeatTask;
+ private enum MemberState {
+ UNJOINED, // the client is not part of a group
+ REBALANCING, // the client has begun rebalancing
+ STABLE, // the client has joined and is sending heartbeats
+ }
+
+ private final int rebalanceTimeoutMs;
private final int sessionTimeoutMs;
private final GroupCoordinatorMetrics sensors;
+ private final Heartbeat heartbeat;
protected final String groupId;
protected final ConsumerNetworkClient client;
protected final Time time;
protected final long retryBackoffMs;
- private boolean needsJoinPrepare = true;
+ private HeartbeatThread heartbeatThread = null;
private boolean rejoinNeeded = true;
- protected Node coordinator;
- protected String memberId;
- protected String protocol;
- protected int generation;
+ private boolean needsJoinPrepare = true;
+ private MemberState state = MemberState.UNJOINED;
+ private RequestFuture joinFuture = null;
+ private Node coordinator = null;
+ private Generation generation = Generation.NO_GENERATION;
private RequestFuture findCoordinatorFuture = null;
@@ -105,6 +119,7 @@ public abstract class AbstractCoordinator implements Closeable {
*/
public AbstractCoordinator(ConsumerNetworkClient client,
String groupId,
+ int rebalanceTimeoutMs,
int sessionTimeoutMs,
int heartbeatIntervalMs,
Metrics metrics,
@@ -113,19 +128,16 @@ public AbstractCoordinator(ConsumerNetworkClient client,
long retryBackoffMs) {
this.client = client;
this.time = time;
- this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
- this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
this.groupId = groupId;
- this.coordinator = null;
+ this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.sessionTimeoutMs = sessionTimeoutMs;
- this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
- this.heartbeatTask = new HeartbeatTask();
+ this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs;
}
/**
- * Unique identifier for the class of protocols implements (e.g. "consumer" or "connect").
+ * Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").
* @return Non-null protocol type name
*/
protected abstract String protocolType();
@@ -175,7 +187,7 @@ protected abstract void onJoinComplete(int generation,
/**
* Block until the coordinator for this group is known and is ready to receive requests.
*/
- public void ensureCoordinatorReady() {
+ public synchronized void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture future = lookupCoordinator();
client.poll(future);
@@ -216,14 +228,44 @@ public void onFailure(RuntimeException e) {
* Check whether the group should be rejoined (e.g. if metadata changes)
* @return true if it should, false otherwise
*/
- protected boolean needRejoin() {
+ protected synchronized boolean needRejoin() {
return rejoinNeeded;
}
+ /**
+ * Check the status of the heartbeat thread (if it is active) and indicate the liveness
+ * of the client. This must be called periodically after joining with {@link #ensureActiveGroup()}
+ * to ensure that the member stays in the group. If an interval of time longer than the
+ * provided rebalance timeout expires without calling this method, then the client will proactively
+ * leave the group.
+ * @param now current time in milliseconds
+ * @throws RuntimeException for unexpected errors raised from the heartbeat thread
+ */
+ protected synchronized void pollHeartbeat(long now) {
+ if (heartbeatThread != null) {
+ if (heartbeatThread.hasFailed()) {
+ // set the heartbeat thread to null and raise an exception. If the user catches it,
+ // the next call to ensureActiveGroup() will spawn a new heartbeat thread.
+ RuntimeException cause = heartbeatThread.failureCause();
+ heartbeatThread = null;
+ throw cause;
+ }
+
+ heartbeat.poll(now);
+ }
+ }
+
+ protected synchronized long timeToNextHeartbeat(long now) {
+ // if we have not joined the group, we don't need to send heartbeats
+ if (state == MemberState.UNJOINED)
+ return Long.MAX_VALUE;
+ return heartbeat.timeToNextHeartbeat(now);
+ }
+
/**
* Ensure that the group is active (i.e. joined and synced)
*/
- public void ensureActiveGroup() {
+ public synchronized void ensureActiveGroup() {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();
@@ -231,11 +273,18 @@ public void ensureActiveGroup() {
if (!needRejoin())
return;
+ // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
+ // time if the client is woken up before a pending rebalance completes.
if (needsJoinPrepare) {
- onJoinPrepare(generation, memberId);
+ onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
+ if (heartbeatThread == null) {
+ heartbeatThread = new HeartbeatThread();
+ heartbeatThread.start();
+ }
+
while (needRejoin()) {
ensureCoordinatorReady();
@@ -246,23 +295,41 @@ public void ensureActiveGroup() {
continue;
}
- RequestFuture future = sendJoinGroupRequest();
- future.addListener(new RequestFutureListener() {
- @Override
- public void onSuccess(ByteBuffer value) {
- // handle join completion in the callback so that the callback will be invoked
- // even if the consumer is woken up before finishing the rebalance
- onJoinComplete(generation, memberId, protocol, value);
- needsJoinPrepare = true;
- heartbeatTask.reset();
- }
+ // we store the join future in case we are woken up by the user after beginning the
+ // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
+ // to rejoin before the pending rebalance has completed.
+ if (joinFuture == null) {
+ state = MemberState.REBALANCING;
+ joinFuture = sendJoinGroupRequest();
+ joinFuture.addListener(new RequestFutureListener() {
+ @Override
+ public void onSuccess(ByteBuffer value) {
+ // handle join completion in the callback so that the callback will be invoked
+ // even if the consumer is woken up before finishing the rebalance
+ synchronized (AbstractCoordinator.this) {
+ log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);
+ joinFuture = null;
+ state = MemberState.STABLE;
+ needsJoinPrepare = true;
+ heartbeatThread.enable();
+ }
- @Override
- public void onFailure(RuntimeException e) {
- // we handle failures below after the request finishes. if the join completes
- // after having been woken up, the exception is ignored and we will rejoin
- }
- });
+ onJoinComplete(generation.generationId, generation.memberId, generation.protocol, value);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ // we handle failures below after the request finishes. if the join completes
+ // after having been woken up, the exception is ignored and we will rejoin
+ synchronized (AbstractCoordinator.this) {
+ joinFuture = null;
+ state = MemberState.UNJOINED;
+ }
+ }
+ });
+ }
+
+ RequestFuture future = joinFuture;
client.poll(future);
if (future.failed()) {
@@ -278,63 +345,6 @@ else if (!future.isRetriable())
}
}
- private class HeartbeatTask implements DelayedTask {
-
- private boolean requestInFlight = false;
-
- public void reset() {
- // start or restart the heartbeat task to be executed at the next chance
- long now = time.milliseconds();
- heartbeat.resetSessionTimeout(now);
- client.unschedule(this);
-
- if (!requestInFlight)
- client.schedule(this, now);
- }
-
- @Override
- public void run(final long now) {
- if (generation < 0 || needRejoin() || coordinatorUnknown()) {
- // no need to send the heartbeat we're not using auto-assignment or if we are
- // awaiting a rebalance
- return;
- }
-
- if (heartbeat.sessionTimeoutExpired(now)) {
- // we haven't received a successful heartbeat in one session interval
- // so mark the coordinator dead
- coordinatorDead();
- return;
- }
-
- if (!heartbeat.shouldHeartbeat(now)) {
- // we don't need to heartbeat now, so reschedule for when we do
- client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
- } else {
- heartbeat.sentHeartbeat(now);
- requestInFlight = true;
-
- RequestFuture future = sendHeartbeatRequest();
- future.addListener(new RequestFutureListener() {
- @Override
- public void onSuccess(Void value) {
- requestInFlight = false;
- long now = time.milliseconds();
- heartbeat.receiveHeartbeat(now);
- long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
- client.schedule(HeartbeatTask.this, nextHeartbeatTime);
- }
-
- @Override
- public void onFailure(RuntimeException e) {
- requestInFlight = false;
- client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
- }
- });
- }
- }
- }
-
/**
* Join the group and return the assignment for the next generation. This function handles both
* JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
@@ -350,7 +360,8 @@ private RequestFuture sendJoinGroupRequest() {
JoinGroupRequest request = new JoinGroupRequest(
groupId,
this.sessionTimeoutMs,
- this.memberId,
+ this.rebalanceTimeoutMs,
+ this.generation.memberId,
protocolType(),
metadata());
@@ -359,7 +370,6 @@ private RequestFuture sendJoinGroupRequest() {
.compose(new JoinGroupResponseHandler());
}
-
private class JoinGroupResponseHandler extends CoordinatorResponseHandler {
@Override
@@ -372,24 +382,32 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
- AbstractCoordinator.this.memberId = joinResponse.memberId();
- AbstractCoordinator.this.generation = joinResponse.generationId();
- AbstractCoordinator.this.rejoinNeeded = false;
- AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
sensors.joinLatency.record(response.requestLatencyMs());
- if (joinResponse.isLeader()) {
- onJoinLeader(joinResponse).chain(future);
- } else {
- onJoinFollower().chain(future);
+
+ synchronized (AbstractCoordinator.this) {
+ if (state != MemberState.REBALANCING) {
+ // if the consumer was woken up before a rebalance completes, we may have already left
+ // the group. In this case, we do not want to continue with the sync group.
+ future.raise(new UnjoinedGroupException());
+ } else {
+ AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
+ joinResponse.memberId(), joinResponse.groupProtocol());
+ AbstractCoordinator.this.rejoinNeeded = false;
+ if (joinResponse.isLeader()) {
+ onJoinLeader(joinResponse).chain(future);
+ } else {
+ onJoinFollower().chain(future);
+ }
+ }
}
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
- coordinator);
+ coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
- AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+ resetGeneration();
log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
@@ -415,8 +433,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut
private RequestFuture onJoinFollower() {
// send follower's sync group with an empty assignment
- SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
- memberId, Collections.emptyMap());
+ SyncGroupRequest request = new SyncGroupRequest(groupId, generation.generationId,
+ generation.memberId, Collections.emptyMap());
log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
return sendSyncGroupRequest(request);
}
@@ -427,7 +445,7 @@ private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) {
Map groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
- SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
+ SyncGroupRequest request = new SyncGroupRequest(groupId, generation.generationId, generation.memberId, groupAssignment);
log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
return sendSyncGroupRequest(request);
} catch (RuntimeException e) {
@@ -454,11 +472,11 @@ public void handle(SyncGroupResponse syncResponse,
RequestFuture future) {
Errors error = Errors.forCode(syncResponse.errorCode());
if (error == Errors.NONE) {
- log.info("Successfully joined group {} with generation {}", groupId, generation);
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(syncResponse.memberAssignment());
} else {
- AbstractCoordinator.this.rejoinNeeded = true;
+ requestRejoin();
+
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -467,7 +485,7 @@ public void handle(SyncGroupResponse syncResponse,
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
log.debug("SyncGroup for group {} failed due to {}", groupId, error);
- AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+ resetGeneration();
future.raise(error);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
@@ -499,43 +517,36 @@ private RequestFuture sendGroupCoordinatorRequest() {
log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
- .compose(new RequestFutureAdapter() {
- @Override
- public void onSuccess(ClientResponse response, RequestFuture future) {
- handleGroupMetadataResponse(response, future);
- }
- });
+ .compose(new GroupCoordinatorResponseHandler());
}
}
- private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture future) {
- log.debug("Received group coordinator response {}", resp);
+ private class GroupCoordinatorResponseHandler extends RequestFutureAdapter {
+
+ @Override
+ public void onSuccess(ClientResponse resp, RequestFuture future) {
+ log.debug("Received group coordinator response {}", resp);
- if (!coordinatorUnknown()) {
- // We already found the coordinator, so ignore the request
- future.complete(null);
- } else {
GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
if (error == Errors.NONE) {
- this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
- groupCoordinatorResponse.node().host(),
- groupCoordinatorResponse.node().port());
-
- log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
-
- client.tryConnect(coordinator);
-
- // start sending heartbeats only if we have a valid generation
- if (generation > 0)
- heartbeatTask.reset();
+ synchronized (AbstractCoordinator.this) {
+ AbstractCoordinator.this.coordinator = new Node(
+ Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
+ groupCoordinatorResponse.node().host(),
+ groupCoordinatorResponse.node().port());
+ log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
+ client.tryConnect(coordinator);
+ heartbeat.resetTimeouts(time.milliseconds());
+ }
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
+ log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());
future.raise(error);
}
}
@@ -546,21 +557,25 @@ private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture 0) {
+ public synchronized void maybeLeaveGroup() {
+ if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
- sendLeaveGroupRequest();
+ LeaveGroupRequest request = new LeaveGroupRequest(groupId, generation.memberId);
+ client.send(coordinator, ApiKeys.LEAVE_GROUP, request)
+ .compose(new LeaveGroupResponseHandler());
+ client.pollNoWakeup();
}
- this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
- this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
- rejoinNeeded = true;
- }
-
- private void sendLeaveGroupRequest() {
- LeaveGroupRequest request = new LeaveGroupRequest(groupId, memberId);
- RequestFuture future = client.send(coordinator, ApiKeys.LEAVE_GROUP, request)
- .compose(new LeaveGroupResponseHandler());
-
- future.addListener(new RequestFutureListener() {
- @Override
- public void onSuccess(Void value) {}
-
- @Override
- public void onFailure(RuntimeException e) {
- log.debug("LeaveGroup request for group {} failed with error", groupId, e);
- }
- });
-
- client.poll(future, 0);
+ resetGeneration();
}
private class LeaveGroupResponseHandler extends CoordinatorResponseHandler {
+
@Override
public LeaveGroupResponse parse(ClientResponse response) {
return new LeaveGroupResponse(response.responseBody());
@@ -620,25 +641,26 @@ public LeaveGroupResponse parse(ClientResponse response) {
@Override
public void handle(LeaveGroupResponse leaveResponse, RequestFuture future) {
- // process the response
- short errorCode = leaveResponse.errorCode();
- if (errorCode == Errors.NONE.code())
+ Errors error = Errors.forCode(leaveResponse.errorCode());
+ if (error == Errors.NONE) {
+ log.debug("LeaveGroup request for group {} returned successfully", groupId);
future.complete(null);
- else
- future.raise(Errors.forCode(errorCode));
+ } else {
+ log.debug("LeaveGroup request for group {} failed with error: {}", groupId, error.message());
+ future.raise(error);
+ }
}
}
- /**
- * Send a heartbeat request now (visible only for testing).
- */
- public RequestFuture sendHeartbeatRequest() {
- HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
+ // visible for testing
+ synchronized RequestFuture sendHeartbeatRequest() {
+ HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
return client.send(coordinator, ApiKeys.HEARTBEAT, req)
- .compose(new HeartbeatCompletionHandler());
+ .compose(new HeartbeatResponseHandler());
}
- private class HeartbeatCompletionHandler extends CoordinatorResponseHandler {
+ private class HeartbeatResponseHandler extends CoordinatorResponseHandler {
+
@Override
public HeartbeatResponse parse(ClientResponse response) {
return new HeartbeatResponse(response.responseBody());
@@ -654,21 +676,20 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture futu
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
- groupId, coordinator);
+ groupId, coordinator());
coordinatorDead();
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
- AbstractCoordinator.this.rejoinNeeded = true;
+ requestRejoin();
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION) {
log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
- AbstractCoordinator.this.rejoinNeeded = true;
+ resetGeneration();
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
- memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
- AbstractCoordinator.this.rejoinNeeded = true;
+ resetGeneration();
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
@@ -678,8 +699,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture futu
}
}
- protected abstract class CoordinatorResponseHandler
- extends RequestFutureAdapter {
+ protected abstract class CoordinatorResponseHandler extends RequestFutureAdapter {
protected ClientResponse response;
public abstract R parse(ClientResponse response);
@@ -758,9 +778,149 @@ public double measure(MetricConfig config, long now) {
};
metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
this.metricGrpName,
- "The number of seconds since the last controller heartbeat"),
+ "The number of seconds since the last controller heartbeat was sent"),
lastHeartbeat);
}
}
+ private class HeartbeatThread extends Thread {
+ private boolean enabled = false;
+ private boolean closed = false;
+ private AtomicReference failed = new AtomicReference<>(null);
+
+ public void enable() {
+ synchronized (AbstractCoordinator.this) {
+ this.enabled = true;
+ heartbeat.resetTimeouts(time.milliseconds());
+ AbstractCoordinator.this.notify();
+ }
+ }
+
+ public void disable() {
+ synchronized (AbstractCoordinator.this) {
+ this.enabled = false;
+ }
+ }
+
+ public void close() {
+ synchronized (AbstractCoordinator.this) {
+ this.closed = true;
+ AbstractCoordinator.this.notify();
+ }
+ }
+
+ private boolean hasFailed() {
+ return failed.get() != null;
+ }
+
+ private RuntimeException failureCause() {
+ return failed.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ RequestFuture findCoordinatorFuture = null;
+
+ while (true) {
+ synchronized (AbstractCoordinator.this) {
+ if (closed)
+ return;
+
+ if (!enabled) {
+ AbstractCoordinator.this.wait();
+ continue;
+ }
+
+ if (state != MemberState.STABLE) {
+ // the group is not stable (perhaps because we left the group or because the coordinator
+ // kicked us out), so disable heartbeats and wait for the main thread to rejoin.
+ disable();
+ continue;
+ }
+
+ client.pollNoWakeup();
+ long now = time.milliseconds();
+
+ if (coordinatorUnknown()) {
+ if (findCoordinatorFuture == null || findCoordinatorFuture.isDone())
+ findCoordinatorFuture = lookupCoordinator();
+ else
+ AbstractCoordinator.this.wait(retryBackoffMs);
+ } else if (heartbeat.sessionTimeoutExpired(now)) {
+ // the session timeout has expired without seeing a successful heartbeat, so we should
+ // probably make sure the coordinator is still healthy.
+ coordinatorDead();
+ } else if (heartbeat.pollTimeoutExpired(now)) {
+ // the poll timeout has expired, which means that the foreground thread has stalled
+ // in between calls to poll(), so we explicitly leave the group.
+ maybeLeaveGroup();
+ } else if (!heartbeat.shouldHeartbeat(now)) {
+ // poll again after waiting for the retry backoff in case the heartbeat failed or the
+ // coordinator disconnected
+ AbstractCoordinator.this.wait(retryBackoffMs);
+ } else {
+ heartbeat.sentHeartbeat(now);
+
+ sendHeartbeatRequest().addListener(new RequestFutureListener() {
+ @Override
+ public void onSuccess(Void value) {
+ synchronized (AbstractCoordinator.this) {
+ heartbeat.receiveHeartbeat(time.milliseconds());
+ }
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ synchronized (AbstractCoordinator.this) {
+ if (e instanceof RebalanceInProgressException) {
+ // it is valid to continue heartbeating while the group is rebalancing. This
+ // ensures that the coordinator keeps the member in the group for as long
+ // as the duration of the rebalance timeout. If we stop sending heartbeats,
+ // however, then the session timeout may expire before we can rejoin.
+ heartbeat.receiveHeartbeat(time.milliseconds());
+ } else {
+ heartbeat.failHeartbeat();
+
+ // wake up the thread if it's sleeping to reschedule the heartbeat
+ AbstractCoordinator.this.notify();
+ }
+ }
+ }
+ });
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
+ this.failed.set(new RuntimeException(e));
+ } catch (RuntimeException e) {
+ log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e);
+ this.failed.set(e);
+ }
+ }
+
+ }
+
+ protected static class Generation {
+ public static final Generation NO_GENERATION = new Generation(
+ OffsetCommitRequest.DEFAULT_GENERATION_ID,
+ JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ null);
+
+ public final int generationId;
+ public final String memberId;
+ public final String protocol;
+
+ public Generation(int generationId, String memberId, String protocol) {
+ this.generationId = generationId;
+ this.memberId = memberId;
+ this.protocol = protocol;
+ }
+ }
+
+ private static class UnjoinedGroupException extends RetriableException {
+
+ }
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 81a40f132a8c..5fee45afe831 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -18,12 +18,13 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -54,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
/**
* This class manages the coordination process with the consumer coordinator.
@@ -68,18 +70,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
private final boolean autoCommitEnabled;
- private final AutoCommitTask autoCommitTask;
+ private final int autoCommitIntervalMs;
private final ConsumerInterceptors, ?> interceptors;
private final boolean excludeInternalTopics;
+ // this collection must be thread-safe because it is modified from the response handler
+ // of offset commit requests, which may be invoked from the heartbeat thread
+ private final ConcurrentLinkedQueue completedOffsetCommits;
+
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
+ private long nextAutoCommitDeadline;
/**
* Initialize the coordination manager.
*/
public ConsumerCoordinator(ConsumerNetworkClient client,
String groupId,
+ int rebalanceTimeoutMs,
int sessionTimeoutMs,
int heartbeatIntervalMs,
List assignors,
@@ -91,11 +99,12 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
long retryBackoffMs,
OffsetCommitCallback defaultOffsetCommitCallback,
boolean autoCommitEnabled,
- long autoCommitIntervalMs,
+ int autoCommitIntervalMs,
ConsumerInterceptors, ?> interceptors,
boolean excludeInternalTopics) {
super(client,
groupId,
+ rebalanceTimeoutMs,
sessionTimeoutMs,
heartbeatIntervalMs,
metrics,
@@ -103,26 +112,22 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
time,
retryBackoffMs);
this.metadata = metadata;
-
- this.metadata.requestUpdate();
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
this.autoCommitEnabled = autoCommitEnabled;
+ this.autoCommitIntervalMs = autoCommitIntervalMs;
this.assignors = assignors;
-
- addMetadataListener();
-
- if (autoCommitEnabled) {
- this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
- this.autoCommitTask.reschedule();
- } else {
- this.autoCommitTask = null;
- }
-
+ this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
this.excludeInternalTopics = excludeInternalTopics;
+
+ if (autoCommitEnabled)
+ this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
+
+ this.metadata.requestUpdate();
+ addMetadataListener();
}
@Override
@@ -210,8 +215,7 @@ protected void onJoinComplete(int generation,
assignor.onAssignment(assignment);
// reschedule the auto commit starting from now
- if (autoCommitEnabled)
- autoCommitTask.reschedule();
+ this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
// execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
@@ -227,6 +231,54 @@ protected void onJoinComplete(int generation,
}
}
+ /**
+ * Poll for coordinator events. This ensures that the coordinator is known and that the consumer
+ * has joined the group (if it is using group management). This also handles periodic offset commits
+ * if they are enabled.
+ *
+ * @param now current time in milliseconds
+ */
+ public void poll(long now) {
+ invokeCompletedOffsetCommitCallbacks();
+
+ if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
+ ensureCoordinatorReady();
+ now = time.milliseconds();
+ }
+
+ if (subscriptions.partitionsAutoAssigned() && needRejoin()) {
+ // due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
+ // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
+ // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
+ // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
+ // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
+ // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
+ if (subscriptions.hasPatternSubscription())
+ client.ensureFreshMetadata();
+
+ ensureActiveGroup();
+ now = time.milliseconds();
+ }
+
+ pollHeartbeat(now);
+ maybeAutoCommitOffsetsAsync(now);
+ }
+
+ /**
+ * Return the time to the next needed invocation of {@link #poll(long)}.
+ * @param now current time in milliseconds
+ * @return the maximum time in milliseconds the caller should wait before the next invocation of poll()
+ */
+ public long timeToNextPoll(long now) {
+ if (!autoCommitEnabled)
+ return timeToNextHeartbeat(now);
+
+ if (now > nextAutoCommitDeadline)
+ return 0;
+
+ return Math.min(nextAutoCommitDeadline - now, timeToNextHeartbeat(now));
+ }
+
@Override
protected Map performAssignment(String leaderId,
String assignmentStrategy,
@@ -292,7 +344,7 @@ protected void onJoinPrepare(int generation, String memberId) {
}
@Override
- public boolean needRejoin() {
+ protected boolean needRejoin() {
return subscriptions.partitionsAutoAssigned() &&
(super.needRejoin() || subscriptions.partitionAssignmentNeeded());
}
@@ -336,24 +388,6 @@ public Map fetchCommittedOffsets(Set offsets, final OffsetCommitCallback callback) {
+ invokeCompletedOffsetCommitCallbacks();
+
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
@@ -384,7 +430,7 @@ public void onSuccess(Void value) {
@Override
public void onFailure(RuntimeException e) {
- callback.onComplete(offsets, new RetriableCommitFailedException(e));
+ completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
}
});
}
@@ -404,16 +450,18 @@ private void doCommitOffsetsAsync(final Map o
public void onSuccess(Void value) {
if (interceptors != null)
interceptors.onCommit(offsets);
- cb.onComplete(offsets, null);
+
+ completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}
@Override
public void onFailure(RuntimeException e) {
- if (e instanceof RetriableException) {
- cb.onComplete(offsets, new RetriableCommitFailedException(e));
- } else {
- cb.onComplete(offsets, e);
- }
+ Exception commitException = e;
+
+ if (e instanceof RetriableException)
+ commitException = new RetriableCommitFailedException(e);
+
+ completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
}
});
}
@@ -427,6 +475,8 @@ public void onFailure(RuntimeException e) {
* @throws CommitFailedException if an unrecoverable error occurs before the commit can be completed
*/
public void commitOffsetsSync(Map offsets) {
+ invokeCompletedOffsetCommitCallbacks();
+
if (offsets.isEmpty())
return;
@@ -449,46 +499,25 @@ public void commitOffsetsSync(Map offsets) {
}
}
- private class AutoCommitTask implements DelayedTask {
- private final long interval;
-
- public AutoCommitTask(long interval) {
- this.interval = interval;
- }
-
- private void reschedule() {
- client.schedule(this, time.milliseconds() + interval);
- }
-
- private void reschedule(long at) {
- client.schedule(this, at);
- }
-
- public void run(final long now) {
+ private void maybeAutoCommitOffsetsAsync(long now) {
+ if (autoCommitEnabled) {
if (coordinatorUnknown()) {
- log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
- reschedule(now + retryBackoffMs);
- return;
- }
-
- if (needRejoin()) {
- // skip the commit when we're rejoining since we'll commit offsets synchronously
- // before the revocation callback is invoked
- reschedule(now + interval);
- return;
- }
-
- commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
- @Override
- public void onComplete(Map offsets, Exception exception) {
- if (exception == null) {
- reschedule(now + interval);
- } else {
- log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
- reschedule(now + interval);
+ this.nextAutoCommitDeadline = now + retryBackoffMs;
+ } else if (now >= nextAutoCommitDeadline) {
+ this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
+ commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
+ @Override
+ public void onComplete(Map offsets, Exception exception) {
+ if (exception != null) {
+ log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
+ if (exception instanceof RetriableException)
+ nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
+ } else {
+ log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId);
+ }
}
- }
- });
+ });
+ }
}
}
@@ -506,6 +535,14 @@ private void maybeAutoCommitOffsetsSync() {
}
}
+ public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
+ @Override
+ public void onComplete(Map offsets, Exception exception) {
+ if (exception != null)
+ log.error("Offset commit failed.", exception);
+ }
+ }
+
/**
* Commit offsets for the specified list of topics and partitions. This is a non-blocking call
* which returns a request future that can be polled in the case of a synchronous commit or ignored in the
@@ -515,12 +552,13 @@ private void maybeAutoCommitOffsetsSync() {
* @return A request future whose value indicates whether the commit was successful or not
*/
private RequestFuture sendOffsetCommitRequest(final Map offsets) {
- if (coordinatorUnknown())
- return RequestFuture.coordinatorNotAvailable();
-
if (offsets.isEmpty())
return RequestFuture.voidSuccess();
+ Node coordinator = coordinator();
+ if (coordinator == null)
+ return RequestFuture.coordinatorNotAvailable();
+
// create the offset commit request
Map offsetData = new HashMap<>(offsets.size());
for (Map.Entry entry : offsets.entrySet()) {
@@ -529,9 +567,21 @@ private RequestFuture sendOffsetCommitRequest(final Map sendOffsetCommitRequest(final Map offsets, Exception exception) {
- if (exception != null)
- log.error("Offset commit failed.", exception);
- }
- }
-
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler {
private final Map offsets;
@@ -607,13 +649,8 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu
|| error == Errors.REBALANCE_IN_PROGRESS) {
// need to re-join group
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
- subscriptions.needReassignment();
- future.raise(new CommitFailedException("Commit cannot be completed since the group has already " +
- "rebalanced and assigned the partitions to another member. This means that the time " +
- "between subsequent calls to poll() was longer than the configured session.timeout.ms, " +
- "which typically implies that the poll loop is spending too much time message processing. " +
- "You can address this either by increasing the session timeout or by reducing the maximum " +
- "size of batches returned in poll() with max.poll.records."));
+ resetGeneration();
+ future.raise(new CommitFailedException());
return;
} else {
log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
@@ -639,7 +676,8 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu
* @return A request future containing the committed offsets.
*/
private RequestFuture