-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10010: Should make state store registration idempotent #8681
Conversation
I left a comment on the JIRA ticket. |
All failed tests are due to flaky: |
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta | |||
} | |||
|
|||
if (stores.containsKey(storeName)) { | |||
throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); | |||
log.warn("State Store {} has already been registered, which could be due to a half-way registration" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to skip the re-registration higher up the call stack. In StateManagerUtil#registerStateStores
we call store.init
on each store which ultimately results in this registerStore
being called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we can rely on storeManager#getStore
inside StateManagerUtil
to check if the store is already registered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So are we still required to remove the illegal argument exception here? What I'm concerned is that the latest version of state store initialization might be different from previous iteration, so it's safer to just go through the entire procedure once more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, I think we should actually keep this (although IllegalStateException
seems to make more sense, can we change it?) -- we should just make sure we don't reach it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: your concern, I don't think we can assume that a user's state store's init
method is idempotent. AFAIK nothing should change that's relevant to the state store registration, but if something does (eg TaskCorrupted) we'd have to wipe out everything and start it all again anyways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, modulo the comment from @ableegoldman
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta | |||
} | |||
|
|||
if (stores.containsKey(storeName)) { | |||
throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); | |||
log.warn("State Store {} has already been registered, which could be due to a half-way registration" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we can rely on storeManager#getStore
inside StateManagerUtil
to check if the store is already registered.
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta | |||
} | |||
|
|||
if (stores.containsKey(storeName)) { | |||
throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); | |||
log.warn("State Store {} has already been registered, which could be due to a half-way registration" + | |||
"in the previous round", storeName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could make the warn log entry more clear that we did not override the registered the store, e.g. "Skipped registering state store {} since it has already existed in the state manager, ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
* 'trunk' of github.com:apache/kafka: MINOR: Increase gradle daemon’s heap size to 2g (apache#8700) KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks (apache#8661) KAFKA-9859 / kafka-streams-application-reset tool doesn't take into account topics generated by KTable foreign key join operation (apache#8671) MINOR: Fix redundant typos in comments and javadocs (apache#8693) KAFKA-10010: Should make state store registration idempotent (apache#8681) KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll (apache#8682) KAFKA-9992: Eliminate JavaConverters in EmbeddedKafkaCluster (apache#8673) KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173 (apache#8689) MINOR: Update stream documentation (apache#8622) MINOR: Small fixes in the documentation (apache#8623)
Standby task could also at risk of getting into illegal state when not being closed during
HandleLostAll
The standby task was initializing as
CREATED
state, and task corrupted exception was thrown from registerStateStoresThe task corrupted exception was caught, and do a non-affected task commit
The task commit failed due to task migrated exception
The handleLostAll didn't close the standby task, leaving it as CREATED state
Next rebalance complete, the same task was assigned back as standby task.
Illegal Argument exception caught as state store already registered
Committer Checklist (excluded from commit message)