Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing #14729

Merged

Conversation

gharris1727
Copy link
Contributor

@gharris1727 gharris1727 commented Nov 10, 2023

The KafkaServer startup() method instantiates a SocketServer, and then waits to call SocketServer enableRequestProcessing. In the intervening time, an exception may be thrown which prevents the enableRequestProcessing from ever being called, and the KafkaServer skips to calling SocketServer shutdown() instead.

In this situation, the current SocketServer Acceptor and Processor implementations do not close their sockets. This causes the sockets to be leaked, potentially interfering with other tests. This change makes calling close() without first calling enableRequestProcessing perform the cleanup that the acceptor and processor threads would have performed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

… enableRequestProcessors

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Copy link
Contributor

@ex172000 ex172000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add some tests?

@@ -679,6 +679,9 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
def close(): Unit = {
beginShutdown()
thread.join()
if (!started) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to put the started as a parameter to the method to make it look cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ex172000 thanks for the review! Do you mean as a parameter to closeAll?

I don't think that's necessary as started is an instance variable. Also, I didn't add the condition to finally block because it should always be true at that point in the code, so it's only present on this code path where the thread has not been started yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a race possible if close is invoked concurrently with start?

If so, I think one possible solution (though slightly hacky) could be to set started = true before kicking off the thread that would be responsible for cleanup, instead of after. Of course synchronization is also a possibility, but I'm not sure on the implications for blocking or even deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Similar concern with Processor::start / Processor::close)

…losed

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@hudeqi hudeqi requested review from hudeqi and ex172000 and removed request for ex172000 November 13, 2023 02:44
Copy link
Collaborator

@hudeqi hudeqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @gharris1727 This pr change makes sense for me, left a comment. Thanks.

core/src/main/scala/kafka/network/SocketServer.scala Outdated Show resolved Hide resolved
@gharris1727
Copy link
Contributor Author

Hey @cmccabe @hachikuji Could you take a look at this?

@gharris1727
Copy link
Contributor Author

@chia7712 @showuon @mimaison Are any of you able to review this resource leak fix? Thanks!

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Greg. I'm not terribly familiar with this neck of the woods but this does seem like a case of improper resource cleanup on failed startup, and the fix does appear to be (modulo some small concurrency concerns) sound.

core/src/main/scala/kafka/network/SocketServer.scala Outdated Show resolved Hide resolved
@@ -679,6 +679,9 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
def close(): Unit = {
beginShutdown()
thread.join()
if (!started) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a race possible if close is invoked concurrently with start?

If so, I think one possible solution (though slightly hacky) could be to set started = true before kicking off the thread that would be responsible for cleanup, instead of after. Of course synchronization is also a possibility, but I'm not sure on the implications for blocking or even deadlock.

@@ -679,6 +679,9 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
def close(): Unit = {
beginShutdown()
thread.join()
if (!started) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Similar concern with Processor::start / Processor::close)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Contributor Author

Hi @C0urante could you take another pass on this?

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Greg, LGTM!

It'd be nice to get someone more familiar with core to take a look at this, but I don't consider it a requisite for merging.

@gharris1727
Copy link
Contributor Author

gharris1727 commented May 10, 2024

I noticed some instability in th SocketServerTest suite locally, but it doesn't appear to be introduced by this change. It appears on trunk (and 3.7, 3.6, 3.5) and coincides with JDK >= 17. I opened a ticket for it here: https://issues.apache.org/jira/browse/KAFKA-16701

I ran this test suite locally with JDK 11 and got consistent passes, and it passes in CI. The other failures in CI look unrelated, and pass locally.

I think i'm comfortable merging this PR at this time.

@gharris1727 gharris1727 merged commit 4e4f7d3 into apache:trunk May 10, 2024
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants