-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7657: Fixing thread state change to instance state change #6018
KAFKA-7657: Fixing thread state change to instance state change #6018
Conversation
…ndling-thread-dead-state-change
@@ -269,23 +276,6 @@ private boolean setState(final State newState) { | |||
return true; | |||
} | |||
|
|||
private boolean setRunningFromCreated() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no logic change: it is just a nit that I moved this function closer to its caller.
@@ -397,18 +386,21 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState | |||
final class StreamStateListener implements StreamThread.StateListener { | |||
private final Map<Long, StreamThread.State> threadState; | |||
private GlobalStreamThread.State globalThreadState; | |||
// this lock should always be held before the state lock | |||
private final Object threadStatesLock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for 1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why we need this? onChange()
is synchronized
, so how can a race condition happen?
final StreamThread.State newState = (StreamThread.State) abstractNewState; | ||
threadState.put(thread.getId(), newState); | ||
|
||
if (newState == StreamThread.State.PARTITIONS_REVOKED && state != State.REBALANCING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for 1), i.e. doe not check instance-level state
here since any access to this should be lock protected.
@@ -252,6 +252,13 @@ private boolean setState(final State newState) { | |||
// when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls) | |||
// will be refused but we do not throw exception here, to allow idempotent close calls | |||
return false; | |||
} else if (state == State.REBALANCING && newState == State.REBALANCING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for 1) as well: we allow REBALANCE -> REBALANCE and RUNNING -> RUNNING because of the deferred check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just looking at this, it seems we need to assign oldState = state
after we go the lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also below, when calling stateListener.onChange(state, oldState);
-- would we need to call stateListener.onChange(newState, oldState);
instead? Otherwise, state
could change before we do the callback because the lock is released already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot follow here. Why would this happen? Similar for RUNNING?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re first two comments: good call, lgtm.
Re last comment: REBALANCING -> REBALANCING is quite normal, note that before this PR we check this before calling setState(State.REBALANCING);
so this is prevented, but part of this fix is to move the logic that needs to access the state into a single place (here). Similarly RUNNING -> RUNNING is possible during starting up phase, where we first set the instance state to RUNNING directly to avoid it transit from CREATED -> REBALANCING, and then when threads are starting, it is possible that maybeSetRunning
went through and hence calls setState(RUNNING)
again.
for (final StreamThread.State state : threadState.values()) { | ||
if (state != StreamThread.State.RUNNING) { | ||
if (state != StreamThread.State.RUNNING && state != StreamThread.State.DEAD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one-liner is for 2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix!
Can you explain why we need the new threadStatesLock
instead of just grabbing the stateLock
as a precondition to updating the threadState
map?
It seems like an optimization to prevent serializing each thread's state update during, eg a rebalance. I suppose this would be nice if there were a large number of threads per instance. Is that what you had in mind?
|
||
// special case when global thread is dead | ||
if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) { | ||
log.warn("Global thread has died. The instance will be in error state and should be closed."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preexisting, but should this be an error log? (seems to be implied by the text of the log)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Error log level, seems to be more appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stateLock.notifyAll(); | ||
} | ||
|
||
// we need to call the user customized state listener outside the state lock to avoid potential deadlocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that the state listener might be invoked out of order?
Eg:
- RUNNING -> REBALANCING
- CREATED -> RUNNING
Maybe this is already accounted for, and it's why we pass both the old and new state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is possible that user's callback maybe triggered out of ordering (in practice may be not very likely), this is not ideal but at that time we did not come up with a better idea how to enforce that we always have instance-level state to go CREATED -> RUNNING -> REBALANCING -> RUNNING at the starting up phase.
All ears if you have a better idea :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why could we not fix this? The underlying issue seems to be the state transitions of StreamThread
-- it allows to go from CREATED -> RUNNING
-- if we change this, and we can only go to CREATED -> PARTITION REVOKED
we should be able to tackle this issue? (Maybe a different PR to do this though.)
Yes, the main purpose is to have finer granularity in locks (i.e. one for the threads map and global thread summary, and one for the instance-level state), since there are call paths that actually does not access or modify the threads map at all but directly jump on the state and modify it (e.g. transitFromCreatedToRunning). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @guozhangwang, LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions. Not sure if I fully understand atm.
@@ -424,12 +416,13 @@ private void maybeSetError() { | |||
* If all threads are up, including the global thread, set to RUNNING | |||
*/ | |||
private void maybeSetRunning() { | |||
// one thread is running, check others, including global thread | |||
// state can be transferred to RUNNING if all threads are either RUNNING or DEAD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to set the instance to RUNNING if all threads are DEAD ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note this is triggered only when a thread transited to RUNNING.
When all threads are DEAD, by the time the last thread transited to DEAD the maybeSetError
will proceed and the state will transit to ERROR.
|
||
// special case when global thread is dead | ||
if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) { | ||
log.warn("Global thread has died. The instance will be in error state and should be closed."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Error log level, seems to be more appropriate.
@@ -252,6 +252,13 @@ private boolean setState(final State newState) { | |||
// when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls) | |||
// will be refused but we do not throw exception here, to allow idempotent close calls | |||
return false; | |||
} else if (state == State.REBALANCING && newState == State.REBALANCING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just looking at this, it seems we need to assign oldState = state
after we go the lock?
@@ -252,6 +252,13 @@ private boolean setState(final State newState) { | |||
// when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls) | |||
// will be refused but we do not throw exception here, to allow idempotent close calls | |||
return false; | |||
} else if (state == State.REBALANCING && newState == State.REBALANCING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also below, when calling stateListener.onChange(state, oldState);
-- would we need to call stateListener.onChange(newState, oldState);
instead? Otherwise, state
could change before we do the callback because the lock is released already.
@@ -330,8 +320,7 @@ public void setStateListener(final KafkaStreams.StateListener listener) { | |||
if (state == State.CREATED) { | |||
stateListener = listener; | |||
} else { | |||
throw new IllegalStateException("Can only set StateListener in CREATED state. " + | |||
"Current state is: " + state); | |||
throw new IllegalStateException("Can only set StateListener in CREATED state. Current state is: " + state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it might be better to get a lock here, too (ie, for the whole method)? Similar in setUncaughtExceptionHandler()
and setGlobalStateRestoreListener()
?
stateLock.notifyAll(); | ||
} | ||
|
||
// we need to call the user customized state listener outside the state lock to avoid potential deadlocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why could we not fix this? The underlying issue seems to be the state transitions of StreamThread
-- it allows to go from CREATED -> RUNNING
-- if we change this, and we can only go to CREATED -> PARTITION REVOKED
we should be able to tackle this issue? (Maybe a different PR to do this though.)
@@ -252,6 +252,13 @@ private boolean setState(final State newState) { | |||
// when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls) | |||
// will be refused but we do not throw exception here, to allow idempotent close calls | |||
return false; | |||
} else if (state == State.REBALANCING && newState == State.REBALANCING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot follow here. Why would this happen? Similar for RUNNING?
@@ -397,18 +386,21 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState | |||
final class StreamStateListener implements StreamThread.StateListener { | |||
private final Map<Long, StreamThread.State> threadState; | |||
private GlobalStreamThread.State globalThreadState; | |||
// this lock should always be held before the state lock | |||
private final Object threadStatesLock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why we need this? onChange()
is synchronized
, so how can a race condition happen?
…ndling-thread-dead-state-change
| Why could we not fix this? The underlying issue seems to be the state transitions of StreamThread -- it allows to go from CREATED -> RUNNING -- if we change this, and we can only go to CREATED -> PARTITION REVOKED we should be able to tackle this issue? (Maybe a different PR to do this though.) Not sure I fully understand, could you elaborate? |
| I don't get why we need this? onChange() is synchronized, so how can a race condition happen? There are possible concurrent access to the
|
About state transitions. My comment was a little bit ambiguous. It was not about callback order, but about avoiding: Currently, we have state transitions for
Those lead to Why not change it to:
This way, on startup we get |
I see. So what I can do is:
Is that right? I think it is doable, and CREATED -> REBALANCING -> RUNNING does look better. But since it changes the state transition a bit that would require the KIP. If people prefer this as well I can piggy-back this on a small proposal. |
…ndling-thread-dead-state-change
I would not set Of course, this implies that we are in state If we need a KIP, I think we should to a separate PR but not piggy-back to this PR. |
The reason for immediate state transition is to disallow
Thinking about this again, I'd agree with you. In fact I'd rather treat it as a minor bug and update the documentations on operations directly. |
What about adding one more state with transitions |
Are you referring to instance state or thread state? |
Thread state. Not sure if we need an additional state at the instance level. |
…ndling-thread-dead-state-change
I see your point now. I've added the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM.
Some minor follow up questions.
// when the state is already in REBALANCING, it should not transit to REBALANCING | ||
return false; | ||
} else if (state == State.RUNNING && newState == State.RUNNING) { | ||
// when the state is already in RUNNING, it should not transit to RUNNING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the state transitions changes for StreamThread
is this still possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I can remove it here.
// special case when global thread is dead | ||
if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) { | ||
log.warn("Global thread has died. The instance will be in error state and should be closed."); | ||
synchronized (threadStatesLock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need threadStateLock
? Can't we use this
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can; but since we have a stateLock
already I want to distinguish it with another dedicated object.
|
||
// special case when global thread is dead | ||
if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) { | ||
log.error("Global thread has died. The instance will be in error state and should be closed."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to encapsulate state handling at instance/thread level, maybe rewrite this to:
if (newState == GlobalStreamThread.State.DEAD) {
setState(State.ERROR);
log.error("Global thread has died. The instance will be in error state and should be closed.");
}
GlobalStreamThread would call this method only once anyway, and setState()
should handle idempotent setState(State.ERROR)
internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do that, but I need to let State.ERROR
transit to State.ERROR
(currently it is not allowed, and hence will cause illegal state exception). Will do that.
@@ -788,17 +789,13 @@ public synchronized void start() throws IllegalStateException, StreamsException | |||
|
|||
final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); | |||
stateDirCleaner.scheduleAtFixedRate(() -> { | |||
// we do not use lock here since we only read on the value and act on it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this holds. Don't we need to block state transitions while we cleanup is running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stateDirectory.cleanRemovedTasks(cleanupDelay);
itself have synchronization barriers as well, so we do not need to have the whole function in synchronized(stateLock)
; and in that case, just locking stateLock
for reading its value does not bring any additional guarantees.
// since we do not allow calling start multiple times whether or not it is already shutdown. | ||
// TODO: In the future if we lift this restriction this code path could then be triggered and be updated | ||
log.error("Already stopped, cannot re-start"); | ||
throw new IllegalStateException("The client is either already started or already stopped, cannot re-start"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to disallow calling start()
twice? Could be idempotent no-op, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question.. this was added at the very beginning when we try to fix a few state transition bugs, and one of them as calling start() twice which may re-create threads etc. Arguably we can still allow calling it twice while making second / future calls no-op.
I'd suggest we leave it as a separate improvement.
* In this case we will forbid the transition but will not treat as an error. | ||
* </li> | ||
* </ul> | ||
*/ | ||
public enum State implements ThreadStateTransitionValidator { | ||
CREATED(1, 4), RUNNING(2, 4), PARTITIONS_REVOKED(3, 4), PARTITIONS_ASSIGNED(1, 2, 4), PENDING_SHUTDOWN(5), DEAD; | ||
CREATED(1, 5), STARTING(2, 5), PARTITIONS_REVOKED(3, 5), PARTITIONS_ASSIGNED(2, 4, 5), RUNNING(2, 5), PENDING_SHUTDOWN(6), DEAD; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should is be PARTITIONS_REVOKED(2, 3, 5)
because it want to transit to itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I can remove the check in setState
and add it here.
EDIT: I've thought about it twice, and I felt it is better to not allow "transiting to itself" but just avoid it in the setState instead. The main reason is that otherwise, the stateListener.onChange
will be called which may trigger user's listener and cause confusion (think: why my listener gets called again on error? it was triggered already).
…ndling-thread-dead-state-change
Reverted this one as I found it lacks a final commit in my local branch, sorry. Re-created #6091. |
…he#6018) While looking into KAFKA-7657, I found there are a few loopholes in this logic: 1. We kept a map of thread-name to thread-state and a global-thread state at the KafkaStreams instance-level, in addition to the instance state itself. stateLock is used when accessing the instance state, however when we are in the thread state change callback, we are accessing both the thread-states as well as the instance state at the same time in the callers of setState without a lock, which is vulnerable to concurrent multi-stream threads. The fix is a) introduce a threadStatesLock in addition to the stateLock, which should always be grabbed to modify the thread-states map before the stateLock for modifying the instance level; and we also defer the checking of the instance-level state inside the setState call. 2. When transiting to state.RUNNING, we check if all threads are either in RUNNING or DEAD state, this is because some threads maybe dead at the rebalance period but we should still proceed to RUNNING if the rest of threads are still transiting to RUNNING. Added unit test for 2) above. Also simplified another test as a nit change. Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Matthias J. Sax <mjsax@apache.org>
…ge (apache#6018)" (apache#6090) This reverts commit d669830.
While looking into KAFKA-7657, I found there are a few loopholes in this logic:
We kept a map of thread-name to thread-state and a global-thread state at the KafkaStreams instance-level, in addition to the instance state itself.
stateLock
is used when accessing the instance state, however when we are in the thread state change callback, we are accessing both the thread-states as well as the instance state at the same time in the callers ofsetState
without a lock, which is vulnerable to concurrent multi-stream threads. The fix is a) introduce athreadStatesLock
in addition to thestateLock
, which should always be grabbed to modify the thread-states map before thestateLock
for modifying the instance level; and we also defer the checking of the instance-level state inside thesetState
call.When transiting to state.RUNNING, we check if all threads are either in RUNNING or DEAD state, this is because some threads maybe dead at the rebalance period but we should still proceed to RUNNING if the rest of threads are still transiting to RUNNING.
Added unit test for 2) above. Also simplified another test as a nit change.
Committer Checklist (excluded from commit message)