-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-18913: Start state updater in task manager #19889
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18913: Start state updater in task manager #19889
Conversation
…n StreamThreadTest fix Updated the code to start the State Updater Thread only after the Stream Thread is started. Changes done : 1. Moved the starting of the StateUpdater thread to a new init method in the TaskManager. 2. Called the init of TaskManager in the run method of the StreamThread. 3. Updated the test cases in the StreamThreadTest to mimic the aforementioned behaviour.
Hi @cadonna , Here is the fix for the issue that we discussed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@janchilling Thanks for the PR!
The PR corresponds to my proposal in the other PR.
I left some comments.
void init() { | ||
if (stateUpdater != null) { | ||
this.stateUpdater.start(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a unit test that verifies that the state updater is indeed started when the task manager is initialized?
private void maybeRunStateUpdater(final boolean stateUpdaterEnabled) { | ||
if (stateUpdaterEnabled) { | ||
thread.taskManager().init(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be enough to call thread.taskManager().init()
instead of this method. Whether the state updater is enabled is already checked in the init()
.
You also do not need to call it in each and every test method. Only in some the call is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @cadonna ,
I actually didn't call it in every test method, only in the ones where the StreamThread is created by using the StreamThreadTest#createStreamThread
. Therefore, to keep the behavior uniform with the code, I included it in those test cases.
That said, if the above approach isn’t ideal, another option could be to include the call in the tests where tasks are not Restoring properly. I tried this alternative as well, and all test cases passed with this approach as well.
Happy to adjust it either way, depending on what you think would be best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough!
I checked a couple of test methods and found that in at least two cases maybeRunStateUpdater()
was not needed. The two cases are: shouldRecordCommitLatency()
and shouldChangeStateInRebalanceListener()
. There might still be others.
No worries! Thanks for letting us know! |
…n StreamThreadTest fix Added a test case to verify whether the StateUpdater thread is running after calling the init method in the TaskManager and to check if it shuts down properly. Furthermore, did some changes requested in the review in the StreamThreadTest class
Hi @cadonna , I’ve added a test case that verifies both the starting and shutdown behavior of the To perform the verification, I used I believe this addresses all the requested changes, but please let me know if there’s anything else you'd like me to update. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
); | ||
|
||
taskManager.init(); | ||
assertTrue(defaultStateUpdater.isRunning()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do not need to check for isRunning()
here. isRunning()
is an implementation detail of the DefaultStateUpdater
. It suffices to verify that stateUpdater.start()
is called on the mock. Something simple like the following:
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldStartStateUpdaterOnInit(final boolean stateUpdaterEnabled) {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, stateUpdaterEnabled);
taskManager.init();
if (stateUpdaterEnabled) {
verify(stateUpdater).start();
} else {
verify(stateUpdater, never()).start();
}
}
The shutdown is tested elsewhere. You do not need to test it again here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing that out. I've updated the test accordingly.
…n StreamThreadTest fix Test case added to verify that the stateUpdater.start() is called on the mock StateUpdater.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @janchilling!
LGTM!
This reverts commit 4d6cf3e.
This reverts commit 4d6cf3e.
Updated the code to start the State Updater Thread only after the Stream
Thread is started.
Changes done :
the TaskManager.
aforementioned behaviour.
Reviewers: Bruno Cadonna cadonna@apache.org