Skip to content

KAFKA-18913: Start state updater in task manager #19889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

janchilling
Copy link
Contributor

@janchilling janchilling commented Jun 3, 2025

Updated the code to start the State Updater Thread only after the Stream
Thread is started.

Changes done :

  1. Moved the starting of the StateUpdater thread to a new init method in
    the TaskManager.
  2. Called the init of TaskManager in the run method of the StreamThread.
  3. Updated the test cases in the StreamThreadTest to mimic the
    aforementioned behaviour.

Reviewers: Bruno Cadonna cadonna@apache.org

…n StreamThreadTest fix

Updated the code to start the State Updater Thread only after the Stream Thread is started.

Changes done :
1. Moved the starting of the StateUpdater thread to a new init method in the TaskManager.
2. Called the init of TaskManager in the run method of the StreamThread.
3. Updated the test cases in the StreamThreadTest to mimic the aforementioned behaviour.
@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels Jun 3, 2025
@janchilling
Copy link
Contributor Author

Hi @cadonna ,

Here is the fix for the issue that we discussed(StateUpdater thread being left to run even though an IllegalStateException thrown before the StreamThread is created).
I hope this is the implementation that you requested to be done. If there are any changes or if anything needs to be added please let me know. I might be away for the next 2 weeks, I'll attend to any required changes when I find time.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janchilling Thanks for the PR!

The PR corresponds to my proposal in the other PR.

I left some comments.

Comment on lines +152 to +156
void init() {
if (stateUpdater != null) {
this.stateUpdater.start();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a unit test that verifies that the state updater is indeed started when the task manager is initialized?

Comment on lines 4153 to 4157
private void maybeRunStateUpdater(final boolean stateUpdaterEnabled) {
if (stateUpdaterEnabled) {
thread.taskManager().init();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be enough to call thread.taskManager().init() instead of this method. Whether the state updater is enabled is already checked in the init().

You also do not need to call it in each and every test method. Only in some the call is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cadonna ,

I actually didn't call it in every test method, only in the ones where the StreamThread is created by using the StreamThreadTest#createStreamThread. Therefore, to keep the behavior uniform with the code, I included it in those test cases.

That said, if the above approach isn’t ideal, another option could be to include the call in the tests where tasks are not Restoring properly. I tried this alternative as well, and all test cases passed with this approach as well.

Happy to adjust it either way, depending on what you think would be best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough!
I checked a couple of test methods and found that in at least two cases maybeRunStateUpdater() was not needed. The two cases are: shouldRecordCommitLatency() and shouldChangeStateInRebalanceListener(). There might still be others.

@cadonna
Copy link
Member

cadonna commented Jun 4, 2025

I might be away for the next 2 weeks, I'll attend to any required changes when I find time.

No worries! Thanks for letting us know!

…n StreamThreadTest fix

Added a test case to verify whether the StateUpdater thread is running after calling the init method in the TaskManager and to check if it shuts down properly.

Furthermore, did some changes requested in the review in the StreamThreadTest class
@janchilling
Copy link
Contributor Author

Hi @cadonna ,

I’ve added a test case that verifies both the starting and shutdown behavior of the StateUpdater thread—since we shouldn't leave the thread running after the test.

To perform the verification, I used DefaultStateUpdater directly in order to access the isRunning() method, as the existing mock StateUpdater doesn't expose it. This meant writing a more customized test rather than reusing the existing setup methods.

I believe this addresses all the requested changes, but please let me know if there’s anything else you'd like me to update.

@janchilling janchilling requested a review from cadonna June 4, 2025 17:31
@github-actions github-actions bot removed the triage PRs from the community label Jun 5, 2025
Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janchilling Thanks for the update!

Here my comment.

Almost there.

);

taskManager.init();
assertTrue(defaultStateUpdater.isRunning());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not need to check for isRunning() here. isRunning() is an implementation detail of the DefaultStateUpdater. It suffices to verify that stateUpdater.start() is called on the mock. Something simple like the following:

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void shouldStartStateUpdaterOnInit(final boolean stateUpdaterEnabled) {
        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, stateUpdaterEnabled);
        taskManager.init();
        if (stateUpdaterEnabled) {
            verify(stateUpdater).start();
        } else {
            verify(stateUpdater, never()).start();
        }
    }

The shutdown is tested elsewhere. You do not need to test it again here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out. I've updated the test accordingly.

…n StreamThreadTest fix

Test case added to verify that the stateUpdater.start() is called on the mock StateUpdater.
@janchilling janchilling requested a review from cadonna June 5, 2025 12:36
Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @janchilling!

LGTM!

@cadonna cadonna changed the title KAFKA-18913: State Updater thread not shutting down during teardown i… KAFKA-18913: Start state updater in task manager Jun 5, 2025
@cadonna cadonna merged commit 4d6cf3e into apache:trunk Jun 6, 2025
35 of 37 checks passed
lucasbru added a commit to lucasbru/kafka that referenced this pull request Jul 14, 2025
lucasbru added a commit to lucasbru/kafka that referenced this pull request Jul 14, 2025
lucasbru added a commit to lucasbru/kafka that referenced this pull request Jul 17, 2025
…he#19889)"

This reverts commit 4d6cf3e.
It seemed to trigger a race condition in the state updater
initialization.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants