-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-11098][Core]Add Outbox to cache the sending messages to resolve the message disorder issue #9197
Conversation
Test build #44060 has finished for PR 9197 at commit
|
Test build #44062 has finished for PR 9197 at commit
|
retest this please |
Test build #44072 has finished for PR 9197 at commit
|
|
||
/** | ||
* A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]], | ||
* we just put messages to its [[Outbox]] to implement a non-block `send` method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: non-blocking
I was hoping we could somehow avoid needing an outbox, but I guess that's the easiest way to go. The code LGTM, although I'll probably take another look. |
Test build #44114 has finished for PR 9197 at commit
|
|
||
override def onSuccess(response: Array[Byte]): Unit = { | ||
val ack = deserialize[Ack](response) | ||
logDebug(s"Receive ack from ${ack.sender}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this log useful? maybe we don't need to do much here ...
LGTM. @vanzin if you have time to take a closer look, that'd be great. |
// update messages and it's safe to just drain the queue. | ||
var message = messages.poll() | ||
while (message != null) { | ||
message.callback.onFailure(new SparkException("Message is dropped because Outbox is stopped")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SparkException can be constructed outside the while loop and reused.
The current NettyRpc has a message order issue because it uses a thread pool to send messages. E.g., running the following two lines in the same thread,
The remote endpoint may see "B" before "A" because sending "A" and "B" are in parallel.
To resolve this issue, this PR added an outbox for each connection, and if we are connecting to the remote node when sending messages, just cache the sending messages in the outbox and send them one by one when the connection is established.