-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6844: Call shutdown on GlobalStreamThread after all StreamThreads have stopped #4950
KAFKA-6844: Call shutdown on GlobalStreamThread after all StreamThreads have stopped #4950
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
I am wondering, how hard it would be to write an test for this case?
@mjsax I had similar thoughts, I just ran out of time yesterday. I'll take look at getting a test together. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
LGTM! |
added an integration test |
@Override | ||
public void process(final String key, final Long value) { | ||
// need to simulate slow processing | ||
Utils.sleep(1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better test scenario is to move the logic in close()
call, i.e. when the stream thread is being shutdown, and topology is closing, we call processorNode.close()
in which we wait for a while and then tries to access the global store. It mimics the case where in closing the store cache is flushed and hence tries to access the global store again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, will move
@guozhangwang updated this |
@bbejeck Jenkins failure is due to a recent PR removing the deprecated API. |
a09a661
to
d6606fb
Compare
rebased this |
…ds have stopped (apache#4950) Moved the shutdown of GlobalStreamThread to after all StreamThread instances have stopped. There can be a race condition where shut down is called on a StreamThread then shut down is called on a GlobalStreamThread, but if StreamThread is delayed in shutting down, the GlobalStreamThread can shutdown first. If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..Store xxx is currently closed " Tested by running all current streams tests. Reviewers: Ted Yu <yuzhihong@gmail.com>, John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Moved the shutdown of
GlobalStreamThread
to after allStreamThread
instances have stopped.There can be a race condition where shut down is called on a
StreamThread
then shut down is called on a GlobalStreamThread, but if StreamThread is delayed in shutting down, the GlobalStreamThread can shutdown first.If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..Store xxx is currently closed "
Tested by running all current streams tests.
Committer Checklist (excluded from commit message)