-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6300 SelectorTest may fail with ConcurrentModificationException #4288
Conversation
@tedyu Thanks for the PR.
|
@@ -73,7 +74,9 @@ public void run() { | |||
try { | |||
while (true) { | |||
final Socket socket = serverSocket.accept(); | |||
sockets.add(socket); | |||
synchronized (sockets) { | |||
if (!closing) sockets.add(socket); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Ted, I quickly had a look at this change if you don't mind. My concern is the following:
Let's suppose that if right after serverSocket.accept()
a closeConnection()
gets scheduled and completes, then this block would indeed run without adding the current socket to a list but it would still start the thread with a leaked socket.
I think we shouldn't even accept any sockets or start threads if the server is being shut down.
Let me know if you disagree or I missed something :).
@viktorsomogyi |
retest this please |
} catch (IOException e) { | ||
// ignore | ||
} finally { | ||
synchronized (sockets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closing
flag should be checked inside the synchronized block. Otherwise, this block could be executed after closeConnections
on a socket that was already accepted, but not added to the list (this is the cause of the transient test failure). The !closing
check in while
is not really needed since accept
would just throw IOException and return, but you want to keep it, it should be synchronized or made volatile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once closing flag becomes true, the while loop in EchoServer#run() should exit.
What about the last assertion in testServerDisconnect() ?
assertEquals("hello", blockingRequest(node, "hello"));
It seems the above should be dropped - otherwise it would hang.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ tedyu left comments on fixing this.
for (Socket socket : sockets) | ||
socket.close(); | ||
synchronized (sockets) { | ||
closing = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closing
should be set in close()
rather than closeConnections
- i.e. set in when closing server socket, not when closing client connections.
|
||
// reconnect and do another request | ||
blockingConnect(node); | ||
assertEquals("hello", blockingRequest(node, "hello")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert this change please, after setting closing
flag in EchoServer#close()
?
@tedyu Can you also rebase the PR since it has conflicts? Thanks. |
New PR coming which addresses Rajini's comments. |
Synchronization is added w.r.t. sockets ArrayList to avoid ConcurrentModificationException
Committer Checklist (excluded from commit message)