-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2454: Deadlock between log segment deletion and server shutdown. #153
Conversation
kafka-trunk-git-pr #179 SUCCESS |
if(isStarted) { | ||
executor.shutdown() | ||
executor.awaitTermination(1, TimeUnit.DAYS) | ||
// We use the local variable to avoid NullPointerException if another thread shuts down scheduler at same time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should work, but do you think it would be clearer to just introduce a shutdownLock
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jjkoshy Good idea, a shutdownLock
will make it more clear. I'll do that. Thanks.
kafka-trunk-git-pr #197 SUCCESS |
shutdownLock synchronized { | ||
if (isStarted) { | ||
this synchronized { | ||
this.executor.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you probably do need to (but unnecessary if we are handling rejected executions in schedule - which we are not)
@jjkoshy Thanks for the comments. I submitted the updated patch. It seems a little tricky to make things right. Do you think the original patch might be simpler? |
kafka-trunk-git-pr #323 FAILURE |
Yes your original patch is simpler. We may also want to survey our usage of |
kafka-trunk-git-pr #338 SUCCESS |
@jjkoshy Good point about IllegalStateException, I think we need to shutdown KafkaScheduler at last of the shutdown server sequence. We actually start the scheduler at first when startup. I checked the usage of KafkaScheduler.schedule() and it seems OK to shutdown the scheduler at last. |
@jjkoshy I just rebased on trunk. |
Can you confirm that shutting down the scheduler at the end is safe? Say, if the log cleaner, or offset cache compaction process runs after those components have been closed - would that be an issue? |
I actually think that might be weird right? E.g., for the offset manager it will actually run (since we don't explicitly check if the offset manager has shut down or not). Basically I think it is better to shutdown the scheduler first - to avoid the possibility of some code in a specific component running after that component has been shut down. |
@@ -121,7 +123,7 @@ class KafkaScheduler(val threads: Int, | |||
|
|||
def isStarted: Boolean = { | |||
this synchronized { | |||
executor != null | |||
executor != null && !executor.isShutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isShutdown
check is redundant right? i.e., since the shutdown and setting to null are in a synchronized block? Also, I think we should change the message in ensureStarted
to say "Kafka scheduler is not running." - since it could have been either not started or may have been shut down.
…ning topics (apache#153) Kafka Streams uses a default segment.ms of 600 seconds for its repartitioning topics. This is less than the current topic policy minimum, so we're lowering the minimum to unblock this use case. We will follow up and consider re-raising it later (https://confluentinc.atlassian.net/browse/CPKAFKA-2417). Reviewers: Ismael Juma <ismael@juma.me.uk>
TICKET = N/A LI_DESCRIPTION = Fixing style to pass gradle checkstyleMain EXIT_CRITERIA = N/A
No description provided.