New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3106] Fix the race condition issue about Connection and ConnectionManager #2019
Conversation
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
This change can resolve being threw ClosedChannelException, CancelledKeyException and warning message "Corresponding SendingConnectionManagerId not found" and "All connections not cleaned up" we can face recently. |
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
I tested this patch on branch-1.0 and still see those Exceptions in the logs, curious to know if you expected this to work there as well, or on YARN? Exceptions: 14/08/19 12:39:42 WARN SendingConnection: Error writing in connection to ConnectionManagerId(demeter-csmaz11-4.demeter.hpc.mssm.edu,35328) 14/08/19 12:37:25 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(demeter-csmau08-20.demeter.hpc.mssm.edu,53302) |
Hi @arahuja , I tested on Hadoop 2.x with YARN. Before I changed, I saw those exception on drivers. Where did you see those exceptions? |
@arahuja I found a path which we meet the situation like you mention. I'll fix soon. |
@arahuja I've modified. Can you test with new PR? |
QA tests have started for PR 2019 at commit
|
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
Jenkins, retest this please. |
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
} | ||
channel.close() | ||
closed = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect change.
Any of those methods can throw an exception - leaving Connection.closed as false.
What is the point of the synchronized btw ? None of the other methods are protected by this lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SendingConnection#close is called from 3 threads on the same instance.
For example, 1st thread of handle-read-write-executor calls ReceivingConnection#close -> SendingConnection#close, 2nd thread of handle-read-write-executor calles SendingConnection#close and 3rd thread of connection-manager-thread calls ConnectionManager#run -> SendingConnection#close.
I think, if it threw exception from any methods in close(), connection is not marked as closed because one of those thread is expected to close resources even if another thread fail to close.
And synchronized block is for protect being called SendingConnection#close from 3 threads.
It can be one of following situation.
(1) One thread of handle-read-write-execuor evaluates key.cancel in SendingConnection#close
(2) Then, connection-manager-thread calls removeConnection via callOnCloseCallback and evaluates "connectionsyKey -= connection.key". This should be fail because connection.key is null at this time.
After (2) above, connection-manager-thread expects connectionsByKey.size != 0 in ConnectionManager#stop but that size cannot be 0 and we get log message "All connections not cleaned up".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way to handle this is to make closed an AtomicBoolean and do a getAndSet.
If the result of getAndSet is false, which means closed was false on invocation, only then do the actual logic of close from earlier : it is a bug that all invocations of close was trying to do the same thing.
Essentially :
a) Change
var closed = false
to
var closed = new AtomicBoolean(false)
b) Change close() to
def close() {
val prev = closed.getAndSet(true)
if (! prev) {
closeImpl()
}
}
Where closeImpl is a private method containing the logic from earlier close (except for the closed variable update).
This will ensure that failures in closeImpl will still result in connection being marked as close; and repeated invocations will not cause same code to be executed and other failures to surface (like missing id from map, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we set closed to true and some of method in close() throws Exceptions, it should be inconsistent state and same instance of SendingConnection#close called by another thread cannot be recover because closed is set to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are misunderstanding the intent of what close is supposed to do for Connection classes. It is supposed to mirror normal expectation of close on streams - barring the bug I mentioned about.
In a nutshell, it is supposed to mark connection as closed (so the repeated invocations of the method are idempotent), and cleanup if required. Take a look at how close is implemented in general in various jdk IO classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
O.K. Connecton#close is just for mark as closed and failure during closing does not need to be recovered right?
If it is, using AtomicBoolean is reasonable.
handling tcp/ip events is by definition async, particularly when state changes can happen orthogonal to state within java variables. GIven that, the changes here look fragile : we can revisit this PR when they are addressed, since I think there is value in some of these. |
One of issues I'd like to resolve in this PR is miss-detection when SedingConnection is closed by corresponding ReceivingConnection in removeConnection.
Actually, this situation is not error. Can we remove the logic for closing SendingConnection? It's expected be closed by itself or ConnectionManager#stop right? |
…calSocketAddressByKey Modified logging message when CancelledKeyException is thrown
…nt unwilling CancelledKeyException
…void unwilling CancelledKeyException and key cancellation before "connectionsByKey -= connection.key" logic in ConnectionManager#removeConnection
…ceivingConnection is closed
… multiple threads
QA tests have started for PR 2019 at commit
|
/CC @JoshRosen In this PR, I want to resolve following issues. (1) Race condition between a thread invoking ConnectionManager#stop and a thread invoking threads invoking Connection#close In this case, if a thread invoking ConnectionManager#stop evaluates "connectionsByKey -= connection.key" in ConnectionManager#removeConnection() after a thread invoking Connection#close evaluates k.cancel or channel.close in Connection#close(), warning message "All connections not cleaned up" appears because when evaluating "connectionsByKey -= connection.key", key is already null. (2) Race condition between a thread invoking SendingConnection#close and a thread invoking SendingConnection#close after invoking ReceivingConnection#close In this case, if a thread invoking ReceivingConnection#close evaluates "!sendingConnectionOpt.isDefined" in ConnectionManager#removeConnection after a thread invoking SendingConnection#close evaluates connectionsById -= "sendingConnectionManagerId" in ConnectionManager#removeConnection, "!sendingConnectionOpt.isDefined" is true and error message "Corresponding SendingConnection to ${remoteConnectionManagerId} not found" appears. (3) Race condition between a thread invoking ConnectionManager#run and threads invoking Connection#close In this case, if a thread invoking ConnectionManager#run evaluates "! key.invalid", after threads invoking Connection#close evaluates key.cancel, "! key.invalid" is true and error message related to CancelledKeyException appears. |
QA tests have finished for PR 2019 at commit
|
…cala to avoid race condition
QA tests have started for PR 2019 at commit
|
QA tests have finished for PR 2019 at commit
|
@@ -280,42 +280,46 @@ private[spark] class ConnectionManager( | |||
} | |||
|
|||
while(!keyInterestChangeRequests.isEmpty) { | |||
// Expect key interested in OP_ACCEPT is not change its interest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand what the comment is trying to say.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If key for OP_ACCEPT enter this loop, connectionsByKey.getOrElse(key, null) will return null so this logic ignore OP_ACCEPT. I'll refine the comment.
No description provided.