Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10199: Do not process when in PARTITIONS_REVOKED #14265

Merged

Conversation

cadonna
Copy link
Contributor

@cadonna cadonna commented Aug 21, 2023

When a Streams application is subscribed with a pattern to input topics and an input topic is deleted, the stream thread transists to PARTITIONS_REVOKED and a rebalance is triggered. This happens inside the poll call. Sometimes, the poll call returns before a new assignment is received. That means, Streams executes the poll loop in state PARTITIONS_REVOKED.

With the state updater enabled processing is also executed in states other than RUNNING and so processing is also executed when the stream thread is in state PARTITION_REVOKED. However, that triggers an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0 which is a fatal error.

This commit prevents processing when the stream thread is in state PARTITIONS_REVOKED.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@cadonna cadonna force-pushed the prevent_processing_in_partition_revoked branch from 3a5d400 to 4e0d7d7 Compare August 21, 2023 08:33
@cadonna cadonna requested a review from lucasbru August 21, 2023 08:33
@cadonna
Copy link
Contributor Author

cadonna commented Aug 21, 2023

The error I encountered in intergration test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted() was:

[2023-08-02 10:37:16,649] ERROR stream-thread [RegexSourceIntegrationTesttestRegexMatchesTopicsAWhenDeleted-7bdb51e3-7738-46fc-97dc-855c060adefd-StreamThread-1] Failed to process stream task 0_0 due to the following error: (org.apache.kafka.streams.processor.internals.TaskExecutor:124)
java.lang.IllegalStateException: No current assignment for partition TEST-TOPIC-A-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.partitionLag(SubscriptionState.java:541)
	at org.apache.kafka.clients.consumer.KafkaConsumer.currentLag(KafkaConsumer.java:2292)
	at org.apache.kafka.streams.processor.internals.PartitionGroup.readyToProcess(PartitionGroup.java:143)
	at org.apache.kafka.streams.processor.internals.StreamTask.isProcessable(StreamTask.java:704)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
	at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
	at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1790)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:806)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:616)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:578)
[2023-08-02 10:37:16,650] ERROR stream-client [RegexSourceIntegrationTesttestRegexMatchesTopicsAWhenDeleted-7bdb51e3-7738-46fc-97dc-855c060adefd] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.  (org.apache.kafka.streams.KafkaStreams:537)
org.apache.kafka.streams.errors.StreamsException: java.lang.IllegalStateException: No current assignment for partition TEST-TOPIC-A-0
	at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:125)
	at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1790)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:806)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:616)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:578)
Caused by: java.lang.IllegalStateException: No current assignment for partition TEST-TOPIC-A-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.partitionLag(SubscriptionState.java:541)
	at org.apache.kafka.clients.consumer.KafkaConsumer.currentLag(KafkaConsumer.java:2292)
	at org.apache.kafka.streams.processor.internals.PartitionGroup.readyToProcess(PartitionGroup.java:143)
	at org.apache.kafka.streams.processor.internals.StreamTask.isProcessable(StreamTask.java:704)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
	at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
	... 5 more

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks!

When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.

With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.

This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.
@cadonna cadonna force-pushed the prevent_processing_in_partition_revoked branch from bdfa9bf to 848c643 Compare September 26, 2023 08:38
@cadonna cadonna merged commit 65efb98 into apache:trunk Sep 26, 2023
1 check failed
@cadonna
Copy link
Contributor Author

cadonna commented Sep 26, 2023

Build failures are unrelated.

chbinousamy added a commit to chbinousamy/kafka that referenced this pull request Sep 26, 2023
* KAFKA-10199: Fix restoration behavior for paused tasks (apache#14437)

State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty.

Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader.

Reviewer: Bruno Cadonna <cadonna@apache.org>

* KAFKA-10199: Do not process when in PARTITIONS_REVOKED (apache#14265)

When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.

With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.

This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-15326: [8/N] Move consumer interaction out of processing methods (apache#14226)

The process method inside the tasks needs to be called from within
the processing threads. However, it currently interacts with the
consumer in two ways:

* It resumes processing when the PartitionGroup buffers are empty
* It fetches the lag from the consumer

We introduce updateLags() and 
resumePollingForPartitionsWithAvailableSpace() methods that call into
the task from the polling thread, in order to set up the consumer
correctly for the next poll, and extract metadata from the consumer
after the poll.

Reviewer: Bruno Cadonna <bruno@confluent.io>

---------

Co-authored-by: Lucas Brutschy <lbrutschy@confluent.io>
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request Oct 3, 2023
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.

With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.

This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
k-wall pushed a commit to k-wall/kafka that referenced this pull request Nov 21, 2023
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.

With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.

This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.

With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.

This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants