Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,7 @@ private[nio] class ConnectionManager(
} catch {
case e: Exception => {
logError(s"Exception was thrown while processing message", e)
val m = Message.createBufferMessage(bufferMessage.id)
m.hasError = true
ackMessage = Some(m)
ackMessage = Some(Message.createErrorMessage(e, bufferMessage.id))
}
} finally {
sendMessage(connectionManagerId, ackMessage.getOrElse {
Expand Down Expand Up @@ -913,8 +911,12 @@ private[nio] class ConnectionManager(
}
case scala.util.Success(ackMessage) =>
if (ackMessage.hasError) {
val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that this could break if we added another message type and used it to report errors. Do you think that I should pre-emptively add error handling code around this?

val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
errorMsgByteBuf.get(errorMsgBytes)
val errorMsg = new String(errorMsgBytes, "utf-8")
val e = new IOException(
"sendMessageReliably failed with ACK that signalled a remote error")
s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
if (!promise.tryFailure(e)) {
logWarning("Ignore error because promise is completed", e)
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/network/nio/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.util.Utils

private[nio] abstract class Message(val typ: Long, val id: Int) {
var senderAddress: InetSocketAddress = null
Expand Down Expand Up @@ -84,6 +85,19 @@ private[nio] object Message {
createBufferMessage(new Array[ByteBuffer](0), ackId)
}

/**
* Create a "negative acknowledgment" to notify a sender that an error occurred
* while processing its message. The exception's stacktrace will be formatted
* as a string, serialized into a byte array, and sent as the message payload.
*/
def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
val exceptionString = Utils.exceptionString(exception)
val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8"))
val errorMessage = createBufferMessage(serializedExceptionString, ackId)
errorMessage.hasError = true
errorMessage
}

def create(header: MessageChunkHeader): Message = {
val newMessage: Message = header.typ match {
case BUFFER_MESSAGE => new BufferMessage(header.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
} catch {
case e: Exception => {
logError("Exception handling buffer message", e)
val errorMessage = Message.createBufferMessage(msg.id)
errorMessage.hasError = true
Some(errorMessage)
Some(Message.createErrorMessage(e, msg.id))
}
}

case otherMessage: Any =>
logError("Unknown type message received: " + otherMessage)
val errorMessage = Message.createBufferMessage(msg.id)
errorMessage.hasError = true
Some(errorMessage)
val errorMsg = s"Received unknown message type: ${otherMessage.getClass.getName}"
logError(errorMsg)
Some(Message.createErrorMessage(new UnsupportedOperationException(errorMsg), msg.id))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.language.postfixOps
import org.scalatest.FunSuite

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.util.Utils

/**
* Test the ConnectionManager with various security settings.
Expand Down Expand Up @@ -236,7 +237,7 @@ class ConnectionManagerSuite extends FunSuite {
val manager = new ConnectionManager(0, conf, securityManager)
val managerServer = new ConnectionManager(0, conf, securityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
throw new Exception
throw new Exception("Custom exception text")
})

val size = 10 * 1024 * 1024
Expand All @@ -246,9 +247,10 @@ class ConnectionManagerSuite extends FunSuite {

val future = manager.sendMessageReliably(managerServer.id, bufferMessage)

intercept[IOException] {
val exception = intercept[IOException] {
Await.result(future, 1 second)
}
assert(Utils.exceptionString(exception).contains("Custom exception text"))

manager.stop()
managerServer.stop()
Expand Down