Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/network/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.network
import java.net._
import java.nio._
import java.nio.channels._
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable.{ArrayBuffer, HashMap, Queue}

Expand All @@ -45,7 +46,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
channel.socket.setKeepAlive(true)
/* channel.socket.setReceiveBufferSize(32768) */

@volatile private var closed = false
private val closed = new AtomicBoolean(false)
var onCloseCallback: Connection => Unit = null
var onExceptionCallback: (Connection, Exception) => Unit = null
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
Expand Down Expand Up @@ -118,17 +119,20 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
}

def close() {
closed = true
val k = key()
if (k != null) {
k.cancel()
if (closed.compareAndSet(false, true)) {
disposeSasl()
callOnCloseCallback()
val k = key()
if (k != null) {
k.synchronized {
k.cancel()
}
}
channel.close()
}
channel.close()
disposeSasl()
callOnCloseCallback()
}

protected def isClosed: Boolean = closed
def isClosed: Boolean = closed.get

def onClose(callback: Connection => Unit) {
onCloseCallback = callback
Expand Down
213 changes: 141 additions & 72 deletions core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,42 +280,46 @@ private[spark] class ConnectionManager(
}

while(!keyInterestChangeRequests.isEmpty) {
// Expect key interested in OP_ACCEPT is not change its interest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand what the comment is trying to say.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If key for OP_ACCEPT enter this loop, connectionsByKey.getOrElse(key, null) will return null so this logic ignore OP_ACCEPT. I'll refine the comment.

val (key, ops) = keyInterestChangeRequests.dequeue()

try {
if (key.isValid) {
key.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned that this isn't really adding much benefit.

Connection.close() doesn't synchronize on anything, so it can still be called while this lock is held, as far as I understand the code. So you could still execute this code all the way to L291 and then get a CancelledKeyException the same way. So you're maybe narrowing the conditions in which you'd get the exception, but it's still possible to get it.

I guess this will be similar to Mridul's comment, but: if the issue is that we're logging these exceptions, how about just demoting them? If we're comfortable that the code is doing the right thing and handling the errors appropriately, then the log messages are not that interesting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing to keep in mind is that this is a hot loop (as described in the comment below). Even without contention, I think synchronized adds a memory fence and this may impact performance. @rxin do you have some shuffle benchmarks which can check performance ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Connection#close synchronize partially but all we should protect is key cancellation. If a thread (Thread-A) enter L286, no threads cannot cancel the key in Connection#close() until Thread-A finishes key-related operations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but again, all that is doing is avoiding an exception. An exception that is handled properly by the code.

So, what is better:

  • synchronize on every key to avoid throwing an exception
  • don't synchronize and let exceptions be handled

Given that this is a hot loop, and during normal operation keys are not being cancelled left and right, I think the second approach is better. So I'd just turn the log messages into logDebug, since they're expected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A compromised solution may be remove the logging but, the situations thrown CancelledKeyException, ClosedChannelException, AsyncronousCloseException even if resolve race condition are fatal. I think we should leave logging logic for detect such fatal situation.
I want error message to appear when we actually need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you say they are fatal? They're just normal things. Nothing is broken when those exceptions happen, because the code is handling them. The only thing that happens is that ugly messages are printed to the logs.

As Mridul mentioned, I think it's better to handle these situations than to avoid them, because it's pretty tricky to avoid them. The code already seems to handle these exceptions properly, and your solution to avoiding them altogether may have performance side-effects.

So unless something is actually broken (which I don't think it is), why try to avoid the exceptions in the first place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If remote host shut down suddenly by accidents, connection should be closed. I think, it's not normal.

Hmm, I think ignoring / hiding or declining severity of those ugly message are one of compromised solutions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is normal. In a distributed system, hosts will go down and you have to handle that.

Now, about logging that it happened. It's useful to log that a host went down. But is this the place to do it? These exceptions are too generic for that - CancelledKeyExceptions can happen for other reasons too. So the code that is notified that a host went down and didn't expect that to happen should log it instead.

val connection = connectionsByKey.getOrElse(key, null)
if (connection != null) {
val lastOps = key.interestOps()
key.interestOps(ops)

// hot loop - prevent materialization of string if trace not enabled.
if (isTraceEnabled()) {
def intToOpStr(op: Int): String = {
val opStrs = ArrayBuffer[String]()
if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
if (connection != null && !connection.isClosed) {
if (key.isValid) {
if (connection != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is redundant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, double "connection != null" is redundant as you mentioned.

val lastOps = key.interestOps()
key.interestOps(ops)

// hot loop - prevent materialization of string if trace not enabled.
if (isTraceEnabled()) {
def intToOpStr(op: Int): String = {
val opStrs = ArrayBuffer[String]()
if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
}

logTrace("Changed key for connection to [" +
connection.getRemoteConnectionManagerId() + "] changed from [" +
intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
}
}

logTrace("Changed key for connection to [" +
connection.getRemoteConnectionManagerId() + "] changed from [" +
intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
} else {
logInfo("Key not valid ?" + key)
throw new CancelledKeyException()
}
}
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
logInfo("key already cancelled ? " + mkAddressInfoStringByKey(key), e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
logError("Exception processing key. " + mkAddressInfoStringByKey(key), e)
triggerForceCloseByException(key, e)
}
}
Expand All @@ -334,17 +338,23 @@ private[spark] class ConnectionManager(
while (allKeys.hasNext) {
val key = allKeys.next()
try {
if (! key.isValid) {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
key.synchronized {
val connection = connectionsByKey.getOrElse(key, null)
if (key.channel.isInstanceOf[ServerSocketChannel] ||
connection != null && !connection.isClosed) {
if (!key.isValid) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge with previous condition? Also, check looks weird (why check for ServerSocketChannel?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we cannot merge 2 condition because L343-344 wants to check connection is closed by another thread (it's not always abnormal case. sometimes happens). and L345 wants to check channel is accidentally closed. Usually, if channel is closed, connection should null. I'd like to detect irregular case.

And, I check for ServerSocketChannel because key is related to ServerSocketChannel, connection is always null and enters infinite loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still merge the conditions. Your code is racy with the connection being closed regardless of whether you have one or two ifs here.

You can get rid of the weird check by doing:

if ((connection == null || !connection.isClosed) && !key.isValid) { throw ... }

But this kinda feeds back into the "why avoid exceptions when they're handled properly" discussion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I separate !key.isValid from the rest intentionally.
When connection is null or connection is closed, !key.isValid is true because connection is closed by another thread calling Connection#close and it's not the case we should log.

If key.isValid is false, even if connection is not closed, it's abnormal case.

Anyway, we (including other committer/contributor) should discuss how we treat this issue.

logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
}
}
} catch {
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
logInfo("key already cancelled ? " + mkAddressInfoStringByKey(key), e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
logError("Exception processing key. " + mkAddressInfoStringByKey(key), e)
triggerForceCloseByException(key, e)
}
}
Expand All @@ -368,32 +378,38 @@ private[spark] class ConnectionManager(
val key = selectedKeys.next
selectedKeys.remove()
try {
if (key.isValid) {
if (key.isAcceptable) {
acceptConnection(key)
} else
if (key.isConnectable) {
triggerConnect(key)
} else
if (key.isReadable) {
triggerRead(key)
} else
if (key.isWritable) {
triggerWrite(key)
key.synchronized {
val connection = connectionsByKey.getOrElse(key, null)
if (key.channel.isInstanceOf[ServerSocketChannel] ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same weird check for ServerSocketChannel.

Also same issue with key.synchronized not really protecting against the underlying connection object being closed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why Checking ServerSocketChannel is same for above.
About key.synchronized is also.

connection != null && !connection.isClosed) {
if (key.isValid) {
if (key.isAcceptable) {
acceptConnection(key)
} else
if (key.isConnectable) {
triggerConnect(key)
} else
if (key.isReadable) {
triggerRead(key)
} else
if (key.isWritable) {
triggerWrite(key)
}
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
}
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
// weird, but we saw this happening - even though key.isValid was true,
// key.isAcceptable would throw CancelledKeyException.
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
logInfo(s"key already cancelled ? " + mkAddressInfoStringByKey(key), e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
logError("Exception processing key. " + mkAddressInfoStringByKey(key), e)
triggerForceCloseByException(key, e)
}
}
Expand Down Expand Up @@ -438,6 +454,83 @@ private[spark] class ConnectionManager(
connectionsByKey += ((connection.key, connection))
}

private def getRemoteSocketAddressByKey(key: SelectionKey) : InetSocketAddress = {
val channel = key.channel
assert(channel.isInstanceOf[SocketChannel])

try {
channel match {
case sc: SocketChannel => {
channel.asInstanceOf[SocketChannel].
socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress]
}
case other => {
logWarning(s"Failed to get remote socket address by key ${key} because " +
s"key is not for InetSocketAddress but for ${other.getClass.getName}")
null
}
}
} catch {
case e: NullPointerException => {
logWarning(s"Failed to get remote socket address by key ${key} because of " +
"unexpected NPE", e)
null
}
}
}

private def getLocalSocketAddressByKey(key: SelectionKey) : InetSocketAddress = {
val channel = key.channel
assert(channel.isInstanceOf[ServerSocketChannel] || channel.isInstanceOf[SocketChannel])

try {
channel match {
case ssc: ServerSocketChannel => {
ssc.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]
}
case sc: SocketChannel => {
sc.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]
}
case other => {
logWarning(s"Failed to get local socket address by key ${key} because " +
s"key is not for InetSocketAddress but for ${other.getClass.getName}")
null
}
}
} catch {
case e: NullPointerException => {
logWarning(s"Failed to get local socket address by key ${key} because " +
"unexpected NPE", e)
null
}
}
}

private def mkAddressInfoStringByKey(key: SelectionKey) : String = {
val (channelType, info) =
try {
val channel = key.channel
channel match {
case s: SocketChannel => {
(s.getClass.getName, s"remote=${getRemoteSocketAddressByKey(key)}")
}
case s: ServerSocketChannel => {
(s.getClass.getName, s"bind=${getLocalSocketAddressByKey(key)}")
}
case s => {
logWarning(s"Failed to get channel info for ${key} because of unexpected channel type")
(s.getClass.getName, null)
}
}
} catch {
case e: NullPointerException => {
logWarning(s"Failed to get channel info because of unexpected NPE", e)
(null, null)
}
}
s"[key(${key}), type(${channelType}), info(${info})]"
}

def removeConnection(connection: Connection) {
connectionsByKey -= connection.key

Expand All @@ -464,32 +557,7 @@ private[spark] class ConnectionManager(
case receivingConnection: ReceivingConnection =>
val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId)

val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
if (!sendingConnectionOpt.isDefined) {
logError(s"Corresponding SendingConnection to ${remoteConnectionManagerId} not found")
return
}

val sendingConnection = sendingConnectionOpt.get
connectionsById -= remoteConnectionManagerId
sendingConnection.close()

val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()

assert(sendingConnectionManagerId == remoteConnectionManagerId)

messageStatuses.synchronized {
for (s <- messageStatuses.values
if s.connectionManagerId == sendingConnectionManagerId) {
logInfo("Notifying " + s)
s.markDone(None)
}

messageStatuses.retain((i, status) => {
status.connectionManagerId != sendingConnectionManagerId
})
}
case _ => logError("Unsupported type of connection.")
}
} finally {
Expand Down Expand Up @@ -889,12 +957,13 @@ private[spark] class ConnectionManager(
ackTimeoutMonitor.cancel()
selectorThread.interrupt()
selectorThread.join()
selector.close()
val connections = connectionsByKey.values
connections.foreach(_.close())
if (connectionsByKey.size != 0) {
logWarning("All connections not cleaned up")
}
serverChannel.close()
selector.close()
handleMessageExecutor.shutdown()
handleReadWriteExecutor.shutdown()
handleConnectExecutor.shutdown()
Expand Down