diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index eec41c6d3b4f1..cba2b65cbba7d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; @@ -241,7 +242,7 @@ public PollResult pollOnClose(long currentTimeMs) { * are sent, so blocking for longer than the heartbeat interval might mean the application thread is not * responsive to changes. * - *

Similarly, we may have to unblock the application thread to send a `PollApplicationEvent` to make sure + *

Similarly, we may have to unblock the application thread to send a {@link AsyncPollEvent} to make sure * our poll timer will not expire while we are polling. * *

In the event that heartbeats are currently being skipped, this still returns the next heartbeat diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 3915ff7c8df88..e806b21c46558 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -41,6 +41,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; @@ -59,7 +60,6 @@ import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent; import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; @@ -325,8 +325,7 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() { // Init value is needed to avoid NPE in case of exception raised in the constructor private Optional clientTelemetryReporter = Optional.empty(); - // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates - private boolean cachedSubscriptionHasAllFetchPositions; + private AsyncPollEvent inflightPoll; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; @@ -464,7 +463,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, subscriptions, - requestManagersSupplier); + requestManagersSupplier + ); this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, time, @@ -623,7 +623,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, new RebalanceCallbackMetricsManager(metrics) ); ApiVersions apiVersions = new ApiVersions(); - Supplier networkClientDelegateSupplier = () -> new NetworkClientDelegate( + Supplier networkClientDelegateSupplier = NetworkClientDelegate.supplier( time, config, logContext, @@ -834,23 +834,19 @@ public ConsumerRecords poll(final Duration timeout) { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } - do { - PollEvent event = new PollEvent(timer.currentTimeMs()); - // Make sure to let the background thread know that we are still polling. - // This will trigger async auto-commits of consumed positions when hitting - // the interval time or reconciling new assignments - applicationEventHandler.add(event); - // Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests - // retrieve the positions to commit before proceeding with fetching new records - ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis()); + // This distinguishes the first pass of the inner do/while loop from subsequent passes for the + // inflight poll event logic. + boolean firstPass = true; + do { // We must not allow wake-ups between polling for fetches and returning the records. // If the polled fetches are not empty the consumed position has already been updated in the polling // of the fetches. A wakeup between returned fetches and returning records would lead to never // returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches. wakeupTrigger.maybeTriggerWakeup(); - updateAssignmentMetadataIfNeeded(timer); + checkInflightPoll(timer, firstPass); + firstPass = false; final Fetch fetch = pollForFetches(timer); if (!fetch.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches @@ -878,6 +874,107 @@ public ConsumerRecords poll(final Duration timeout) { } } + /** + * {@code checkInflightPoll()} manages the lifetime of the {@link AsyncPollEvent} processing. If it is + * called when no event is currently processing, it will start a new event processing asynchronously. A check + * is made during each invocation to see if the inflight event has completed. If it has, it will be + * processed accordingly. + */ + private void checkInflightPoll(Timer timer, boolean firstPass) { + if (firstPass && inflightPoll != null) { + // Handle the case where there's a remaining inflight poll from the *previous* invocation + // of AsyncKafkaConsumer.poll(). + maybeClearPreviousInflightPoll(); + } + + boolean newlySubmittedEvent = false; + + if (inflightPoll == null) { + inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer), time.milliseconds()); + newlySubmittedEvent = true; + log.trace("Inflight event {} submitted", inflightPoll); + applicationEventHandler.add(inflightPoll); + } + + try { + // Note: this is calling user-supplied code, so make sure that any errors thrown here are caught and + // the inflight event is cleared. + offsetCommitCallbackInvoker.executeCallbacks(); + processBackgroundEvents(); + } catch (Throwable t) { + // If an exception was thrown during execution of offset commit callbacks or background events, + // bubble it up to the user but make sure to clear out the inflight request because the error effectively + // renders it complete. + log.trace("Inflight event {} failed due to {}, clearing", inflightPoll, String.valueOf(t)); + inflightPoll = null; + throw ConsumerUtils.maybeWrapAsKafkaException(t); + } finally { + timer.update(); + } + + if (inflightPoll != null) { + maybeClearCurrentInflightPoll(newlySubmittedEvent); + } + } + + private void maybeClearPreviousInflightPoll() { + if (inflightPoll.isComplete()) { + Optional errorOpt = inflightPoll.error(); + + if (errorOpt.isPresent()) { + // If the previous inflight event is complete, check if it resulted in an error. If there was + // an error, throw it without delay. + KafkaException error = errorOpt.get(); + log.trace("Previous inflight event {} completed with an error ({}), clearing", inflightPoll, error); + inflightPoll = null; + throw error; + } else { + // Successful case... + if (fetchBuffer.isEmpty()) { + // If it completed without error, but without populating the fetch buffer, clear the event + // so that a new event will be enqueued below. + log.trace("Previous inflight event {} completed without filling the buffer, clearing", inflightPoll); + inflightPoll = null; + } else { + // However, if the event completed, and it populated the buffer, *don't* create a new event. + // This is to prevent an edge case of starvation when poll() is called with a timeout of 0. + // If a new event was created on *every* poll, each time the event would have to complete the + // validate positions stage before the data in the fetch buffer is used. Because there is + // no blocking, and effectively a 0 wait, the data in the fetch buffer is continuously ignored + // leading to no data ever being returned from poll(). + log.trace("Previous inflight event {} completed and filled the buffer, not clearing", inflightPoll); + } + } + } else if (inflightPoll.isExpired(time) && inflightPoll.isValidatePositionsComplete()) { + // The inflight event validated positions, but it has expired. + log.trace("Previous inflight event {} expired without completing, clearing", inflightPoll); + inflightPoll = null; + } + } + + private void maybeClearCurrentInflightPoll(boolean newlySubmittedEvent) { + if (inflightPoll.isComplete()) { + Optional errorOpt = inflightPoll.error(); + + if (errorOpt.isPresent()) { + // If the inflight event completed with an error, throw it without delay. + KafkaException error = errorOpt.get(); + log.trace("Inflight event {} completed with an error ({}), clearing", inflightPoll, error); + inflightPoll = null; + throw error; + } else { + log.trace("Inflight event {} completed without error, clearing", inflightPoll); + inflightPoll = null; + } + } else if (!newlySubmittedEvent) { + if (inflightPoll.isExpired(time) && inflightPoll.isValidatePositionsComplete()) { + // The inflight event validated positions, but it has expired. + log.trace("Inflight event {} expired without completing, clearing", inflightPoll); + inflightPoll = null; + } + } + } + /** * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and * partitions. @@ -1773,16 +1870,27 @@ private Fetch pollForFetches(Timer timer) { return fetch; } - // send any new fetches (won't resend pending fetches) - sendFetches(timer); - - // We do not want to be stuck blocking in poll if we are missing some positions - // since the offset lookup may be backing off after a failure + // With the non-blocking poll design, it's possible that at this point the background thread is + // concurrently working to update positions. Therefore, a _copy_ of the current assignment is retrieved + // and iterated looking for any partitions with invalid positions. This is done to avoid being stuck + // in poll for an unnecessarily long amount of time if we are missing some positions since the offset + // lookup may be backing off after a failure. + if (pollTimeout > retryBackoffMs) { + Set partitions = subscriptions.assignedPartitions(); - // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call - // updateAssignmentMetadataIfNeeded before this method. - if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { - pollTimeout = retryBackoffMs; + if (partitions.isEmpty()) { + // If there aren't any assigned partitions, this could mean that this consumer's group membership + // has not been established or assignments have been removed and not yet reassigned. In either case, + // reduce the poll time for the fetch buffer wait. + pollTimeout = retryBackoffMs; + } else { + for (TopicPartition tp : partitions) { + if (!subscriptions.hasValidPosition(tp)) { + pollTimeout = retryBackoffMs; + break; + } + } + } } log.trace("Polling for fetches with timeout {}", pollTimeout); @@ -1811,19 +1919,19 @@ private Fetch pollForFetches(Timer timer) { * of the {@link #fetchBuffer}, converting it to a well-formed {@link CompletedFetch}, validating that it and * the internal {@link SubscriptionState state} are correct, and then converting it all into a {@link Fetch} * for returning. - * - *

- * - * This method will {@link ConsumerNetworkThread#wakeup() wake up the network thread} before returning. This is - * done as an optimization so that the next round of data can be pre-fetched. */ private Fetch collectFetch() { - final Fetch fetch = fetchCollector.collectFetch(fetchBuffer); - - // Notify the network thread to wake up and start the next round of fetching. - applicationEventHandler.wakeupNetworkThread(); + // With the non-blocking async poll, it's critical that the application thread wait until the background + // thread has completed the stage of validating positions. This prevents a race condition where both + // threads may attempt to update the SubscriptionState.position() for a given partition. So if the background + // thread has not completed that stage for the inflight event, don't attempt to collect data from the fetch + // buffer. If the inflight event was nulled out by checkInflightPoll(), that implies that it is safe to + // attempt to collect data from the fetch buffer. + if (inflightPoll != null && !inflightPoll.isValidatePositionsComplete()) { + return Fetch.empty(); + } - return fetch; + return fetchCollector.collectFetch(fetchBuffer); } /** @@ -1836,11 +1944,10 @@ private Fetch collectFetch() { * defined */ private boolean updateFetchPositions(final Timer timer) { - cachedSubscriptionHasAllFetchPositions = false; try { CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer)); wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future()); - cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent); + applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent); } catch (TimeoutException e) { return false; } finally { @@ -1858,41 +1965,6 @@ private boolean isCommittedOffsetsManagementEnabled() { return groupMetadata.get().isPresent(); } - /** - * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}. - * - *

- * - * This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method - * of the same name: - * - *

- * - * @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice - * is used to avoid using {@link Long#MAX_VALUE} to wait "forever" - */ - private void sendFetches(Timer timer) { - try { - applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer))); - } catch (TimeoutException swallow) { - // Can be ignored, per above comments. - } - } - /** * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the * pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index d2d178a88c38b..67656cf327b5b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -20,9 +20,9 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; -import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.events.MetadataErrorNotifiableEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; @@ -40,6 +40,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.function.Supplier; @@ -193,10 +194,13 @@ private void processApplicationEvents() { try { if (event instanceof CompletableEvent) { applicationEventReaper.add((CompletableEvent) event); - // Check if there are any metadata errors and fail the CompletableEvent if an error is present. - // This call is meant to handle "immediately completed events" which may not enter the awaiting state, - // so metadata errors need to be checked and handled right away. - maybeFailOnMetadataError(List.of((CompletableEvent) event)); + } + // Check if there are any metadata errors and fail the event if an error is present. + // This call is meant to handle "immediately completed events" which may not enter the + // awaiting state, so metadata errors need to be checked and handled right away. + if (event instanceof MetadataErrorNotifiableEvent) { + if (maybeFailOnMetadataError(List.of(event))) + continue; } applicationEventProcessor.process(event); } catch (Throwable t) { @@ -368,18 +372,26 @@ void cleanup() { /** * If there is a metadata error, complete all uncompleted events that require subscription metadata. */ - private void maybeFailOnMetadataError(List> events) { - List> subscriptionMetadataEvent = new ArrayList<>(); + private boolean maybeFailOnMetadataError(List events) { + List filteredEvents = new ArrayList<>(); - for (CompletableEvent ce : events) { - if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent) ce).requireSubscriptionMetadata()) - subscriptionMetadataEvent.add((CompletableApplicationEvent) ce); + for (Object obj : events) { + if (obj instanceof MetadataErrorNotifiableEvent) { + filteredEvents.add((MetadataErrorNotifiableEvent) obj); + } } - if (subscriptionMetadataEvent.isEmpty()) - return; - networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> - subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) - ); + // Don't get-and-clear the metadata error if there are no events that will be notified. + if (filteredEvents.isEmpty()) + return false; + + Optional metadataError = networkClientDelegate.getAndClearMetadataError(); + + if (metadataError.isPresent()) { + filteredEvents.forEach(e -> e.onMetadataError(metadataError.get())); + return true; + } else { + return false; + } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java index c52b5453e21d9..e7139657d5abf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java @@ -145,6 +145,14 @@ private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer, try { Map fetchRequests = fetchRequestPreparer.prepare(); + if (fetchRequests.isEmpty()) { + // If there's nothing to fetch, wake up the FetchBuffer so it doesn't needlessly wait for a wakeup + // that won't come until the data in the fetch buffer is consumed. + fetchBuffer.wakeup(); + pendingFetchRequestFuture.complete(null); + return PollResult.EMPTY; + } + List requests = fetchRequests.entrySet().stream().map(entry -> { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index e85f244c80472..762718b2035d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -471,4 +471,33 @@ protected NetworkClientDelegate create() { } }; } + + /** + * Creates a {@link Supplier} for deferred creation during invocation by + * {@link ConsumerNetworkThread}. + */ + public static Supplier supplier(final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client, + final Metadata metadata, + final BackgroundEventHandler backgroundEventHandler, + final boolean notifyMetadataErrorsViaErrorQueue, + final AsyncConsumerMetrics asyncConsumerMetrics) { + return new CachedSupplier<>() { + @Override + protected NetworkClientDelegate create() { + return new NetworkClientDelegate( + time, + config, + logContext, + client, + metadata, + backgroundEventHandler, + notifyMetadataErrorsViaErrorQueue, + asyncConsumerMetrics + ); + } + }; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 501821a231028..4f41886e9ce4f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -38,13 +38,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; +import org.apache.kafka.clients.consumer.internals.events.SharePollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; @@ -385,7 +385,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { backgroundEventQueue, time, asyncConsumerMetrics); final Supplier networkClientDelegateSupplier = - () -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics); + NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -586,7 +586,7 @@ public synchronized ConsumerRecords poll(final Duration timeout) { do { // Make sure the network thread can tell the application is actively polling - applicationEventHandler.add(new PollEvent(timer.currentTimeMs())); + applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs())); processBackgroundEvents(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index 0441566462a0c..eee77b7ae9a09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; @@ -426,7 +427,7 @@ public StreamsMembershipManager membershipManager() { * are sent, so blocking for longer than the heartbeat interval might mean the application thread is not * responsive to changes. * - *

Similarly, we may have to unblock the application thread to send a `PollApplicationEvent` to make sure + *

Similarly, we may have to unblock the application thread to send a {@link AsyncPollEvent} to make sure * our poll timer will not expire while we are polling. * *

In the event that heartbeats are currently being skipped, this still returns the next heartbeat diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java index cb23e6aaf2826..2db2b16b1d268 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java @@ -21,14 +21,14 @@ import java.util.List; import java.util.Map; -public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent>> { +public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent>> implements MetadataErrorNotifiableEvent { protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) { super(type, deadlineMs); } @Override - public boolean requireSubscriptionMetadata() { - return true; + public void onMetadataError(Exception metadataError) { + future().completeExceptionally(metadataError); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index f3f0e161015b4..79ca558123a7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -28,14 +28,14 @@ public abstract class ApplicationEvent { public enum Type { - COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, + COMMIT_ASYNC, COMMIT_SYNC, ASYNC_POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE, UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE, PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG, - SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, + SHARE_POLL, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, SHARE_ACKNOWLEDGE_ON_CLOSE, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 2ac1be587a6f6..568518d4c4864 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -18,13 +18,17 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager; import org.apache.kafka.clients.consumer.internals.Acknowledgements; import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; +import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager; +import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; @@ -45,6 +49,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -53,6 +58,7 @@ * An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread} * which processes {@link ApplicationEvent application events} generated by the application thread. */ +@SuppressWarnings({"ClassFanOutComplexity"}) public class ApplicationEventProcessor implements EventProcessor { private final Logger log; @@ -76,6 +82,14 @@ public ApplicationEventProcessor(final LogContext logContext, @Override public void process(ApplicationEvent event) { switch (event.type()) { + case ASYNC_POLL: + process((AsyncPollEvent) event); + return; + + case SHARE_POLL: + process((SharePollEvent) event); + return; + case COMMIT_ASYNC: process((AsyncCommitEvent) event); return; @@ -84,10 +98,6 @@ public void process(ApplicationEvent event) { process((SyncCommitEvent) event); return; - case POLL: - process((PollEvent) event); - return; - case FETCH_COMMITTED_OFFSETS: process((FetchCommittedOffsetsEvent) event); return; @@ -217,35 +227,13 @@ public void process(ApplicationEvent event) { } } - private void process(final PollEvent event) { - // Trigger a reconciliation that can safely commit offsets if needed to rebalance, - // as we're processing before any new fetching starts in the app thread + private void process(final SharePollEvent event) { requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> consumerMembershipManager.maybeReconcile(true)); - if (requestManagers.commitRequestManager.isPresent()) { - CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); - commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); - // all commit request generation points have been passed, - // so it's safe to notify the app thread could proceed and start fetching - event.markReconcileAndAutoCommitComplete(); - requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); - }); - requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); - }); - } else { - // safe to unblock - no auto-commit risk here: - // 1. commitRequestManager is not present - // 2. shareConsumer has no auto-commit mechanism - event.markReconcileAndAutoCommitComplete(); - requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); - }); - } + requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); } private void process(final CreateFetchRequestsEvent event) { @@ -352,7 +340,7 @@ private void process(final TopicSubscriptionChangeEvent event) { if (subscriptions.subscribe(event.topics(), event.listener())) { this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); } - requestManagers.streamsMembershipManager.get().onSubscriptionUpdated(); + requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated(); event.future().complete(null); } catch (Exception e) { event.future().completeExceptionally(e); @@ -375,7 +363,10 @@ private void process(final TopicPatternSubscriptionChangeEvent event) { try { subscriptions.subscribe(event.pattern(), event.listener()); metadata.requestUpdateForNewTopics(); - updatePatternSubscription(metadata.fetch()); + requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { + ConsumerMembershipManager membershipManager = hrm.membershipManager(); + updatePatternSubscription(membershipManager::onSubscriptionUpdated, metadata.fetch()); + }); event.future().complete(null); } catch (Exception e) { event.future().completeExceptionally(e); @@ -409,13 +400,7 @@ private void process(final TopicRe2JPatternSubscriptionChangeEvent event) { * This will make the consumer send the updated subscription on the next poll. */ private void process(final UpdatePatternSubscriptionEvent event) { - if (!subscriptions.hasPatternSubscription()) { - return; - } - if (this.metadataVersionSnapshot < metadata.updateVersion()) { - this.metadataVersionSnapshot = metadata.updateVersion(); - updatePatternSubscription(metadata.fetch()); - } + requestManagers.consumerMembershipManager.ifPresent(mm -> maybeUpdatePatternSubscription(mm::onSubscriptionUpdated)); event.future().complete(null); } @@ -726,6 +711,69 @@ private void process(final StreamsOnAllTasksLostCallbackCompletedEvent event) { requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event); } + private void process(final AsyncPollEvent event) { + // Trigger a reconciliation that can safely commit offsets if needed to rebalance, + // as we're processing before any new fetching starts + requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> + consumerMembershipManager.maybeReconcile(true)); + + if (requestManagers.commitRequestManager.isPresent()) { + CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); + commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); + + requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { + ConsumerMembershipManager membershipManager = hrm.membershipManager(); + maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated); + membershipManager.onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); + requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { + StreamsMembershipManager membershipManager = hrm.membershipManager(); + maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated); + membershipManager.onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); + } + + CompletableFuture updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); + event.markValidatePositionsComplete(); + + updatePositionsFuture.whenComplete((__, updatePositionsError) -> { + if (maybeCompleteAsyncPollEventExceptionally(event, updatePositionsError)) + return; + + requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> { + if (maybeCompleteAsyncPollEventExceptionally(event, fetchError)) + return; + + event.completeSuccessfully(); + }); + }); + } + + /** + * If there's an error to report to the user, the current event will be completed and this method will + * return {@code true}. Otherwise, it will return {@code false}. + */ + private boolean maybeCompleteAsyncPollEventExceptionally(AsyncPollEvent event, Throwable t) { + if (t == null) + return false; + + if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) { + log.trace("Ignoring timeout for {}: {}", event, t.getMessage()); + return false; + } + + if (t instanceof CompletionException) { + t = t.getCause(); + } + + KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + event.completeExceptionally(e); + log.trace("Failing event processing for {}", event, e); + return true; + } + private BiConsumer complete(final CompletableFuture b) { return (value, exception) -> { if (exception != null) @@ -757,6 +805,16 @@ protected ApplicationEventProcessor create() { }; } + private void maybeUpdatePatternSubscription(MembershipManagerShim membershipManager) { + if (!subscriptions.hasPatternSubscription()) { + return; + } + if (this.metadataVersionSnapshot < metadata.updateVersion()) { + this.metadataVersionSnapshot = metadata.updateVersion(); + updatePatternSubscription(membershipManager, metadata.fetch()); + } + } + /** * This function evaluates the regex that the consumer subscribed to * against the list of topic names from metadata, and updates @@ -764,11 +822,7 @@ protected ApplicationEventProcessor create() { * * @param cluster Cluster from which we get the topics */ - private void updatePatternSubscription(Cluster cluster) { - if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) { - log.warn("Group membership manager not present when processing a subscribe event"); - return; - } + private void updatePatternSubscription(MembershipManagerShim membershipManager, Cluster cluster) { final Set topicsToSubscribe = cluster.topics().stream() .filter(subscriptions::matchesSubscribedPattern) .collect(Collectors.toSet()); @@ -779,11 +833,21 @@ private void updatePatternSubscription(Cluster cluster) { // Join the group if not already part of it, or just send the updated subscription // to the broker on the next poll. Note that this is done even if no topics matched // the regex, to ensure the member joins the group if needed (with empty subscription). - requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated(); + membershipManager.onSubscriptionUpdated(); } // Visible for testing int metadataVersionSnapshot() { return metadataVersionSnapshot; } + + /** + * Ideally the {@link AbstractMembershipManager#onSubscriptionUpdated()} API could be invoked directly, but + * unfortunately {@link StreamsMembershipManager} doesn't extend from {@link AbstractMembershipManager}, so + * that method is not directly available. This functional interface acts as a shim to support both. + */ + private interface MembershipManagerShim { + + void onSubscriptionUpdated(); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java new file mode 100644 index 0000000000000..068193ca498d5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.Time; + +import java.time.Duration; +import java.util.Optional; + +/** + * This class represents the non-blocking event that executes logic functionally equivalent to the following: + * + *

    + *
  • Polling
  • + *
  • {@link CheckAndUpdatePositionsEvent}
  • + *
  • {@link CreateFetchRequestsEvent}
  • + *
+ * + * {@link AsyncKafkaConsumer#poll(Duration)} is implemented using a non-blocking design to ensure performance is + * at the same level as {@link ClassicKafkaConsumer#poll(Duration)}. The event is submitted in {@code poll()}, but + * there are no blocking waits for the "result" of the event. Checks are made for the result at certain points, but + * they do not block. The logic for the previously-mentioned events is executed sequentially on the background thread. + */ +public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNotifiableEvent { + + private final long deadlineMs; + private final long pollTimeMs; + private volatile KafkaException error; + private volatile boolean isComplete; + private volatile boolean isValidatePositionsComplete; + + /** + * Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic. + * + * @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the + * {@link Duration} passed to {@link Consumer#poll(Duration)} + * @param pollTimeMs Time, in milliseconds, at which point the event was created + */ + public AsyncPollEvent(long deadlineMs, long pollTimeMs) { + super(Type.ASYNC_POLL); + this.deadlineMs = deadlineMs; + this.pollTimeMs = pollTimeMs; + } + + public long deadlineMs() { + return deadlineMs; + } + + public long pollTimeMs() { + return pollTimeMs; + } + + public Optional error() { + return Optional.ofNullable(error); + } + + public boolean isExpired(Time time) { + return time.milliseconds() >= deadlineMs(); + } + + public boolean isValidatePositionsComplete() { + return isValidatePositionsComplete; + } + + public void markValidatePositionsComplete() { + this.isValidatePositionsComplete = true; + } + + public boolean isComplete() { + return isComplete; + } + + public void completeSuccessfully() { + isComplete = true; + } + + public void completeExceptionally(KafkaException e) { + error = e; + isComplete = true; + } + + @Override + public void onMetadataError(Exception metadataError) { + completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError)); + } + + @Override + protected String toStringBase() { + return super.toStringBase() + + ", deadlineMs=" + deadlineMs + + ", pollTimeMs=" + pollTimeMs + + ", error=" + error + + ", isComplete=" + isComplete + + ", isValidatePositionsComplete=" + isValidatePositionsComplete; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java index 5f1ced33e3a09..4fd834eaf0980 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.TopicPartition; @@ -30,7 +31,7 @@ * The event completes with a boolean indicating if all assigned partitions have valid fetch positions * (based on {@link SubscriptionState#hasAllFetchPositions()}). */ -public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent { +public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent implements MetadataErrorNotifiableEvent { public CheckAndUpdatePositionsEvent(long deadlineMs) { super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs); @@ -39,11 +40,11 @@ public CheckAndUpdatePositionsEvent(long deadlineMs) { /** * Indicates that this event requires subscription metadata to be present * for its execution. This is used to ensure that metadata errors are - * handled correctly during the {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration) poll} - * or {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition) position} process. + * handled correctly during the {@link Consumer#poll(Duration) poll} + * or {@link Consumer#position(TopicPartition) position} process. */ @Override - public boolean requireSubscriptionMetadata() { - return true; + public void onMetadataError(Exception metadataError) { + future().completeExceptionally(metadataError); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java index 51b2d1ffbdb90..8cd17d19feb66 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java @@ -52,8 +52,4 @@ public long deadlineMs() { protected String toStringBase() { return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs; } - - public boolean requireSubscriptionMetadata() { - return false; - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java index 605a2ff30c24a..bce78e4aa2075 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java @@ -32,7 +32,7 @@ * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than * or equals to the target timestamp) */ -public class ListOffsetsEvent extends CompletableApplicationEvent> { +public class ListOffsetsEvent extends CompletableApplicationEvent> implements MetadataErrorNotifiableEvent { private final Map timestampsToSearch; private final boolean requireTimestamps; @@ -65,8 +65,8 @@ public boolean requireTimestamps() { } @Override - public boolean requireSubscriptionMetadata() { - return true; + public void onMetadataError(Exception metadataError) { + future().completeExceptionally(metadataError); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java new file mode 100644 index 0000000000000..be8ec46796087 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataErrorNotifiableEvent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; + +/** + * This interface is used for events that need to be notified when the + * {@link NetworkClientDelegate#getAndClearMetadataError()} has an error. + */ +public interface MetadataErrorNotifiableEvent { + + /** + * The background thread detects metadata errors on every call to {@link NetworkClientDelegate#poll(long, long)}. + * {@link NetworkClientDelegate} calls {@link Metadata#maybeThrowAnyException()} and stores the result. + * The presence of a metadata error is checked in the {@link ConsumerNetworkThread}'s loop by calling + * {@link NetworkClientDelegate#getAndClearMetadataError()}. There are two places in the loop in which the + * metadata error is checked: + * + *
    + *
  • + * At the very top of the {@link ConsumerNetworkThread}'s loop, the {@link ApplicationEventHandler}'s + * queue is drained. Before processing each event via + * {@link ApplicationEventProcessor#process(ApplicationEvent)}, if a metadata error occurred, this method + * will be invoked on the event if it implements this interface. + *

    + * Note: for an event on which this method is invoked, it will not be passed to the + * {@link ApplicationEventProcessor#process(ApplicationEvent)} method. + *

  • + *
  • + * At the very bottom of the {@link ConsumerNetworkThread}'s loop, the {@link CompletableEventReaper} + * is executed and any outstanding event is returned. If a metadata error occurred, this method + * will be invoked on all unexpired events if it implements this interface. + *
  • + *
+ * + * @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()} + */ + void onMetadataError(Exception metadataError); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java similarity index 54% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java index 37df5d9ddc210..2db7b18173c01 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java @@ -16,28 +16,12 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import java.util.concurrent.CompletableFuture; - -public class PollEvent extends ApplicationEvent { +public class SharePollEvent extends ApplicationEvent { private final long pollTimeMs; - /** - * A future that represents the completion of reconciliation and auto-commit - * processing. - * This future is completed when all commit request generation points have - * been passed, including: - *
    - *
  • auto-commit on rebalance
  • - *
  • auto-commit on the interval
  • - *
- * Once completed, it signals that it's safe for the consumer to proceed with - * fetching new records. - */ - private final CompletableFuture reconcileAndAutoCommit = new CompletableFuture<>(); - - public PollEvent(final long pollTimeMs) { - super(Type.POLL); + public SharePollEvent(final long pollTimeMs) { + super(Type.SHARE_POLL); this.pollTimeMs = pollTimeMs; } @@ -45,14 +29,6 @@ public long pollTimeMs() { return pollTimeMs; } - public CompletableFuture reconcileAndAutoCommit() { - return reconcileAndAutoCommit; - } - - public void markReconcileAndAutoCommitComplete() { - reconcileAndAutoCommit.complete(null); - } - @Override public String toStringBase() { return super.toStringBase() + ", pollTimeMs=" + pollTimeMs; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 78ff15cee5f8e..f73e22ed69f08 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2685,9 +2685,9 @@ public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedExcept } // poll once again, which should send the list-offset request consumer.seek(tp0, 50L); - consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(0)); boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS); boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH); return hasListOffsetRequest && hasFetchRequest; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java index 402697227ee80..891e15846f3e8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -19,8 +19,8 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; @@ -61,7 +61,7 @@ public void testRecordApplicationEventQueueSize(String groupName) { asyncConsumerMetrics )) { // add event - applicationEventHandler.add(new PollEvent(time.milliseconds())); + applicationEventHandler.add(new AsyncPollEvent(time.milliseconds() + 10, time.milliseconds())); verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index bccd4ebeb8149..6517e25ff6baa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -35,21 +35,19 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; -import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; import org.apache.kafka.clients.consumer.internals.events.CommitEvent; import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; -import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; @@ -57,7 +55,6 @@ import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; -import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -154,6 +151,7 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -417,14 +415,13 @@ public void testWakeupBeforeCallingPoll() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); consumer.wakeup(); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } @@ -439,12 +436,11 @@ public void testWakeupAfterEmptyFetch() { consumer.wakeup(); return Fetch.empty(); }).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } @@ -463,12 +459,11 @@ public void testWakeupAfterNonEmptyFetch() { consumer.wakeup(); return Fetch.forPartition(tp, records, true, new OffsetAndMetadata(4, Optional.of(0), "")); }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1))); // the previously ignored wake-up should not be ignored in the next call @@ -482,7 +477,6 @@ public void testCommitInRebalanceCallback() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); SortedSet sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); sortedPartitions.add(tp); CompletableBackgroundEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, sortedPartitions); @@ -505,7 +499,7 @@ public void onPartitionsAssigned(final Collection partitions) { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(Collections.singletonList(topicName), listener); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); consumer.poll(Duration.ZERO); assertTrue(callbackExecuted.get()); } @@ -522,12 +516,11 @@ public void testClearWakeupTriggerAfterPoll() { ); doReturn(Fetch.forPartition(tp, records, true, new OffsetAndMetadata(4, Optional.of(0), ""))) .when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); consumer.poll(Duration.ZERO); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); @@ -668,12 +661,11 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { MockCommitCallback callback = new MockCommitCallback(); completeCommitAsyncApplicationEventSuccessfully(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback); } @@ -1195,19 +1187,6 @@ public void testNoInterceptorCommitAsyncFailed() { assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); } - @Test - public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() { - consumer = newConsumer(); - testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(); - } - - @Test - public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() { - // Create consumer without group id so committed offsets are not used for updating positions - consumer = newConsumerWithoutGroupId(); - testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(); - } - @Test public void testSubscribeGeneratesEvent() { consumer = newConsumer(); @@ -1474,7 +1453,7 @@ public void testListenerCallbacksInvoke(List consumer.poll(Duration.ZERO)); } @@ -1544,7 +1522,7 @@ public void testBackgroundError() { backgroundEventQueue.add(errorEvent); completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); assertEquals(expectedException.getMessage(), exception.getMessage()); @@ -1563,7 +1541,7 @@ public void testMultipleBackgroundErrors() { backgroundEventQueue.add(errorEvent2); completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); assertEquals(expectedException1.getMessage(), exception.getMessage()); @@ -1642,14 +1620,12 @@ public void testEnsurePollEventSentOnConsumerPoll() { doAnswer(invocation -> Fetch.forPartition(tp, records, true, new OffsetAndMetadata(3, Optional.of(0), ""))) .when(fetchCollector) .collectFetch(Mockito.any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(singletonList("topic1")); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); consumer.poll(Duration.ofMillis(100)); - verify(applicationEventHandler).add(any(PollEvent.class)); - verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class)); + verify(applicationEventHandler, atLeastOnce()).add(any(AsyncPollEvent.class)); } private Properties requiredConsumerConfigAndGroupId(final String groupId) { @@ -1658,20 +1634,6 @@ private Properties requiredConsumerConfigAndGroupId(final String groupId) { return props; } - private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() { - completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); - - completeAssignmentChangeEventSuccessfully(); - consumer.assign(singleton(new TopicPartition("t1", 1))); - markReconcileAndAutoCommitCompleteForPollEvent(); - consumer.poll(Duration.ZERO); - - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class)); - } - @Test public void testLongPollWaitIsLimited() { consumer = newConsumer(); @@ -1700,9 +1662,8 @@ public void testLongPollWaitIsLimited() { }).doAnswer(invocation -> Fetch.forPartition(tp, records, true, nextOffsetAndMetadata) ).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); assertEquals(2, returnedRecords.count()); @@ -1798,7 +1759,6 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); @@ -1806,7 +1766,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { // interrupt the thread and call poll try { Thread.currentThread().interrupt(); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); } finally { // clear interrupted state again since this thread may be reused by JUnit @@ -1837,8 +1797,7 @@ void testReaperInvokedInPoll() { doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(Collections.singletonList("topic")); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); consumer.poll(Duration.ZERO); verify(backgroundEventReaper).reap(time.milliseconds()); } @@ -1890,28 +1849,6 @@ public void testSeekToEnd() { assertEquals(AutoOffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); } - @Test - public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() { - consumer = newConsumer(); - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); - doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); - completeAssignmentChangeEventSuccessfully(); - completeTopicPatternSubscriptionChangeEventSuccessfully(); - completeUnsubscribeApplicationEventSuccessfully(); - - consumer.assign(singleton(new TopicPartition("topic1", 0))); - markReconcileAndAutoCommitCompleteForPollEvent(); - consumer.poll(Duration.ZERO); - verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); - - consumer.unsubscribe(); - - consumer.subscribe(Pattern.compile("t*")); - consumer.poll(Duration.ZERO); - verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class)); - } - @Test public void testSubscribeToRe2JPatternValidation() { consumer = newConsumer(); @@ -2276,11 +2213,11 @@ public void testCloseWrapsStreamsRebalanceListenerException() { } } - private void markReconcileAndAutoCommitCompleteForPollEvent() { + private void completeAsyncPollEventSuccessfully() { doAnswer(invocation -> { - PollEvent event = invocation.getArgument(0); - event.markReconcileAndAutoCommitComplete(); + AsyncPollEvent event = invocation.getArgument(0); + event.completeSuccessfully(); return null; - }).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class)); + }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class)); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 35ccb17dfab43..88004ebbcd74b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -18,8 +18,8 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.metrics.Metrics; @@ -258,7 +258,7 @@ public void testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTi )) { consumerNetworkThread.initializeResources(); - PollEvent event = new PollEvent(0); + AsyncPollEvent event = new AsyncPollEvent(10, 0); event.setEnqueuedMs(time.milliseconds()); applicationEventQueue.add(event); asyncConsumerMetrics.recordApplicationEventQueueSize(1); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 1765563adcdfa..9a6da634c5008 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -24,12 +24,12 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; +import org.apache.kafka.clients.consumer.internals.events.SharePollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; @@ -702,7 +702,7 @@ public void testEnsurePollEventSentOnConsumerPoll() { consumer.subscribe(subscriptionTopic); consumer.poll(Duration.ofMillis(100)); - verify(applicationEventHandler).add(any(PollEvent.class)); + verify(applicationEventHandler).add(any(SharePollEvent.class)); verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class)); completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index dde3f567132fc..61b53f9c19d57 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -49,6 +49,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.verification.VerificationMode; import java.util.Collection; import java.util.Collections; @@ -65,6 +66,7 @@ import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -88,6 +90,7 @@ public class ApplicationEventProcessorTest { private final ConsumerHeartbeatRequestManager heartbeatRequestManager = mock(ConsumerHeartbeatRequestManager.class); private final ConsumerMembershipManager membershipManager = mock(ConsumerMembershipManager.class); private final OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class); + private final FetchRequestManager fetchRequestManager = mock(FetchRequestManager.class); private SubscriptionState subscriptionState = mock(SubscriptionState.class); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private final StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = mock(StreamsGroupHeartbeatRequestManager.class); @@ -99,7 +102,7 @@ private void setupProcessor(boolean withGroupId) { new LogContext(), offsetsRequestManager, mock(TopicMetadataRequestManager.class), - mock(FetchRequestManager.class), + fetchRequestManager, withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(commitRequestManager) : Optional.empty(), withGroupId ? Optional.of(heartbeatRequestManager) : Optional.empty(), @@ -171,7 +174,7 @@ public void testApplicationEventIsProcessed(ApplicationEvent e) { private static Stream applicationEvents() { return Stream.of( - Arguments.of(new PollEvent(100)), + Arguments.of(new AsyncPollEvent(calculateDeadlineMs(12345, 100), 100)), Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))), Arguments.of(new CheckAndUpdatePositionsEvent(500)), Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)), @@ -264,16 +267,20 @@ public void testSeekUnvalidatedEventWithException() { } @Test - public void testPollEvent() { - PollEvent event = new PollEvent(12345); + public void testAsyncPollEvent() { + AsyncPollEvent event = new AsyncPollEvent(12346, 12345); setupProcessor(true); when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + when(offsetsRequestManager.updateFetchPositions(event.deadlineMs())).thenReturn(CompletableFuture.completedFuture(true)); + when(fetchRequestManager.createFetchRequests()).thenReturn(CompletableFuture.completedFuture(null)); processor.process(event); - assertTrue(event.reconcileAndAutoCommit().isDone()); - verify(commitRequestManager).updateTimerAndMaybeCommit(12345); + assertTrue(event.isComplete()); + verify(commitRequestManager).updateTimerAndMaybeCommit(event.pollTimeMs()); verify(membershipManager).onConsumerPoll(); - verify(heartbeatRequestManager).resetPollTimer(12345); + verify(heartbeatRequestManager).resetPollTimer(event.pollTimeMs()); + verify(offsetsRequestManager).updateFetchPositions(event.deadlineMs()); + verify(fetchRequestManager).createFetchRequests(); } @Test @@ -655,6 +662,72 @@ public void testStreamsOnAllTasksLostCallbackCompletedEventWithoutStreamsMembers } } + @Test + public void testUpdatePatternSubscriptionInvokedWhenMetadataUpdated() { + when(subscriptionState.hasPatternSubscription()).thenReturn(true); + when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true); + when(metadata.updateVersion()).thenReturn(1, 2); + testUpdatePatternSubscription(times(1)); + } + + @Test + public void testUpdatePatternSubscriptionNotInvokedWhenNotUsingPatternSubscription() { + when(subscriptionState.hasPatternSubscription()).thenReturn(false); + when(metadata.updateVersion()).thenReturn(1, 2); + testUpdatePatternSubscription(never()); + } + + @Test + public void testUpdatePatternSubscriptionNotInvokedWhenMetadataNotUpdated() { + when(subscriptionState.hasPatternSubscription()).thenReturn(true); + when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true); + when(metadata.updateVersion()).thenReturn(1, 1); + testUpdatePatternSubscription(never()); + } + + private void testUpdatePatternSubscription(VerificationMode verificationMode) { + String topic = "test-topic"; + Cluster cluster = mock(Cluster.class); + + when(metadata.fetch()).thenReturn(cluster); + when(cluster.topics()).thenReturn(Set.of(topic)); + + when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(true)); + + setupProcessor(true); + processor.process(new AsyncPollEvent(110, 100)); + verify(subscriptionState, verificationMode).matchesSubscribedPattern(topic); + verify(membershipManager, verificationMode).onSubscriptionUpdated(); + } + + @Test + public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() { + setupProcessor(true); + testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(); + } + + @Test + public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() { + // Create consumer without group id so committed offsets are not used for updating positions + setupProcessor(false); + testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(); + } + + private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() { + when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn( + CompletableFuture.failedFuture(new Throwable("Intentional failure")) + ); + when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + + // Verify that the poll completes even when the update fetch positions throws an error. + AsyncPollEvent event = new AsyncPollEvent(110, 100); + processor.process(event); + verify(offsetsRequestManager).updateFetchPositions(anyLong()); + assertTrue(event.isComplete()); + assertFalse(event.error().isEmpty()); + } + private List mockCommitResults() { return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index bfcc0bb0d4fca..f7d8ab2c99c51 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1356,7 +1356,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { sendRecords(producer, 1, tp) removeAllClientAcls() - val consumer = createConsumer() + // Remove the group.id configuration since this self-assigning partitions. + val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) consumer.assign(java.util.List.of(tp)) assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index a480a3b5ffbad..c25ed9e9bfba9 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -35,7 +35,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer -import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer} +import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRecords, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfigs, TopicConfig} @@ -568,8 +568,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { try { consumer.assign(util.Set.of(tp)) consumer.seekToBeginning(util.Set.of(tp)) - val records = consumer.poll(time.Duration.ofSeconds(3)) - assertEquals(expectedNumber, records.count()) + def verifyRecordCount(records: ConsumerRecords[Array[Byte], Array[Byte]]): Boolean = { + expectedNumber == records.count() + } + TestUtils.pollRecordsUntilTrue( + consumer, + verifyRecordCount, + s"Consumer.poll() did not return the expected number of records ($expectedNumber) within the timeout", + pollTimeoutMs = 3000 + ) } finally consumer.close() } @@ -4637,7 +4644,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { prepareRecords(testTopicName) // Producer sends messages - for (i <- 1 to 20) { + val numRecords = 20 + + for (i <- 1 to numRecords) { TestUtils.waitUntilTrue(() => { val producerRecord = producer.send( new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) @@ -4646,19 +4655,29 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { }, "Fail to produce record to topic") } + val consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + val streams = createStreamsGroup( + configOverrides = consumerConfig, inputTopics = Set(testTopicName), changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId, ) try { - TestUtils.waitUntilTrue(() => { - streams.poll(JDuration.ofMillis(100L)) - !streams.assignment().isEmpty - }, "Consumer not assigned to partitions") + var counter = 0 + + def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = { + counter += records.count() + counter >= numRecords + } + TestUtils.pollRecordsUntilTrue( + streams, + verifyRecordCount, + s"Consumer not assigned to partitions" + ) - streams.poll(JDuration.ofMillis(1000L)) streams.commitSync() TestUtils.waitUntilTrue(() => { @@ -4698,7 +4717,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { prepareTopics(List(testTopicName), testNumPartitions) prepareRecords(testTopicName) // Producer sends messages - for (i <- 1 to 20) { + val numRecords = 20 + + for (i <- 1 to numRecords) { TestUtils.waitUntilTrue(() => { val producerRecord = producer.send( new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) @@ -4707,19 +4728,29 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { }, "Fail to produce record to topic") } + val consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + val streams = createStreamsGroup( + configOverrides = consumerConfig, inputTopics = Set(testTopicName), changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId, ) try { - TestUtils.waitUntilTrue(() => { - streams.poll(JDuration.ofMillis(100L)) - !streams.assignment().isEmpty - }, "Consumer not assigned to partitions") + var counter = 0 + + def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = { + counter += records.count() + counter >= numRecords + } + TestUtils.pollRecordsUntilTrue( + streams, + verifyRecordCount, + s"Consumer not assigned to partitions" + ) - streams.poll(JDuration.ofMillis(1000L)) streams.commitSync() // List streams group offsets @@ -4776,7 +4807,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { prepareRecords(testTopicName) // Producer sends messages - for (i <- 1 to 20) { + val numRecords = 20 + + for (i <- 1 to numRecords) { TestUtils.waitUntilTrue(() => { val producerRecord = producer.send( new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) @@ -4785,19 +4818,29 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { }, "Fail to produce record to topic") } + val consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + val streams = createStreamsGroup( + configOverrides = consumerConfig, inputTopics = Set(testTopicName), changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId, ) try { - TestUtils.waitUntilTrue(() => { - streams.poll(JDuration.ofMillis(100L)) - !streams.assignment().isEmpty - }, "Consumer not assigned to partitions") + var counter = 0 + + def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = { + counter += records.count() + counter >= numRecords + } + TestUtils.pollRecordsUntilTrue( + streams, + verifyRecordCount, + s"Consumer not assigned to partitions" + ) - streams.poll(JDuration.ofMillis(1000L)) streams.commitSync() // List streams group offsets diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index c08c43081e6a2..382f548ed5231 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -145,13 +145,27 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } private def verifyConsumerWithAuthenticationFailure(consumer: Consumer[Array[Byte], Array[Byte]]): Unit = { - verifyAuthenticationException(consumer.poll(Duration.ofMillis(1000))) + val startMs = System.currentTimeMillis + TestUtils.pollUntilException( + consumer, + _ => true, + s"Consumer.poll() did not throw an exception within the timeout", + pollTimeoutMs = 1000 + ) + val elapsedMs = System.currentTimeMillis - startMs + assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs") verifyAuthenticationException(consumer.partitionsFor(topic)) createClientCredential() val producer = createProducer() verifyWithRetry(sendOneRecord(producer))() - verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1) + TestUtils.waitUntilTrue(() => { + try { + consumer.poll(Duration.ofMillis(1000)).count() == 1 + } catch { + case _:Throwable => false + } + }, msg = s"Consumer.poll() did not read the expected number of records within the timeout") } @Test diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala index 575c612bf26a1..fe7d8fb441b00 100644 --- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala +++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala @@ -19,7 +19,6 @@ package kafka.server import java.net.InetSocketAddress -import java.time.Duration import java.util.Properties import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import javax.security.auth.login.LoginContext @@ -185,7 +184,12 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { consumer.assign(java.util.List.of(tp)) val startMs = System.currentTimeMillis() - assertThrows(classOf[SaslAuthenticationException], () => consumer.poll(Duration.ofMillis(50))) + TestUtils.pollUntilException( + consumer, + t => t.isInstanceOf[SaslAuthenticationException], + "Consumer.poll() did not trigger a SaslAuthenticationException within timeout", + pollTimeoutMs = 50 + ) val endMs = System.currentTimeMillis() require(endMs - startMs < failedAuthenticationDelayMs, "Failed authentication must not be delayed on the client") consumer.close() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 8b0affae9eab9..63d0c3e49a961 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -690,6 +690,21 @@ object TestUtils extends Logging { }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs) } + def pollUntilException(consumer: Consumer[_, _], + action: Throwable => Boolean, + msg: => String, + waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, + pollTimeoutMs: Long = 100): Unit = { + waitUntilTrue(() => { + try { + consumer.poll(Duration.ofMillis(pollTimeoutMs)) + false + } catch { + case t: Throwable => action(t) + } + }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs) + } + def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V], action: ConsumerRecords[K, V] => Boolean, msg: => String,