Permalink
Browse files

tcp connection actors, see #2886

  • Loading branch information...
1 parent 284e64f commit be9abae1e32fda97fa15713ebf83f7b2d3c5f25e @jrudolph jrudolph committed Jan 15, 2013
@@ -54,8 +54,17 @@ akka {
# Fully qualified config path which holds the dispatcher configuration
# for the selector management actors
management-dispatcher = "akka.actor.default-dispatcher"
+
+ # The size of the thread-local direct buffers used to read or write
+ # network data from the kernel. Those buffer directly add to the footprint
+ # of the threads from the dispatcher tcp connection actors are using.
+ direct-buffer-size = 524288
+
+ # The duration a connection actor waits for a `Register` message from
+ # its commander before aborting the connection.
+ register-timeout = 5s
}
}
-}
+}
@@ -31,31 +31,30 @@ object Tcp extends ExtensionKey[TcpExt] {
case class Bind(handler: ActorRef,
address: InetSocketAddress,
backlog: Int = 100,
- options: immutable.Seq[SO.SocketOption] = Nil) extends Command
+ options: immutable.Seq[SocketOption] = Nil) extends Command
case object Unbind extends Command
case class Register(handler: ActorRef) extends Command
- object SO {
+ /**
+ * SocketOption is a package of data (from the user) and associated
+ * behavior (how to apply that to a socket).
+ */
+ sealed trait SocketOption {
/**
- * SocketOption is a package of data (from the user) and associated
- * behavior (how to apply that to a socket).
+ * Action to be taken for this option before calling bind()
*/
- sealed trait SocketOption {
- /**
- * Action to be taken for this option before calling bind()
- */
- def beforeBind(s: ServerSocket): Unit = ()
- /**
- * Action to be taken for this option before calling connect()
- */
- def beforeConnect(s: Socket): Unit = ()
- /**
- * Action to be taken for this option after connect returned (i.e. on
- * the slave socket for servers).
- */
- def afterConnect(s: Socket): Unit = ()
- }
-
+ def beforeBind(s: ServerSocket): Unit = ()
+ /**
+ * Action to be taken for this option before calling connect()
+ */
+ def beforeConnect(s: Socket): Unit = ()
+ /**
+ * Action to be taken for this option after connect returned (i.e. on
+ * the slave socket for servers).
+ */
+ def afterConnect(s: Socket): Unit = ()
+ }
+ object SO {
// shared socket options
/**
@@ -109,7 +108,7 @@ object Tcp extends ExtensionKey[TcpExt] {
* For more information see [[java.net.Socket.setSendBufferSize]]
*/
case class SendBufferSize(size: Int) extends SocketOption {
- require(size > 0, "ReceiveBufferSize must be > 0")
+ require(size > 0, "SendBufferSize must be > 0")
override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size)
}
@@ -139,39 +138,37 @@ object Tcp extends ExtensionKey[TcpExt] {
}
// TODO: what about close reasons?
- case object Close extends Command
- case object ConfirmedClose extends Command
- case object Abort extends Command
-
- trait Write extends Command {
- def data: ByteString
- def ack: AnyRef
- def nack: AnyRef
- }
+ sealed trait CloseCommand extends Command
+
+ case object Close extends CloseCommand
+ case object ConfirmedClose extends CloseCommand
+ case object Abort extends CloseCommand
+
+ case class Write(data: ByteString, ack: AnyRef) extends Command
object Write {
- def apply(_data: ByteString): Write = new Write {
- def data: ByteString = _data
- def ack: AnyRef = null
- def nack: AnyRef = null
- }
+ val Empty: Write = Write(ByteString.empty, null)
+ def apply(data: ByteString): Write =
+ if (data.isEmpty) Empty else Write(data, null)
}
+
case object StopReading extends Command
case object ResumeReading extends Command
/// EVENTS
sealed trait Event
- case object Bound extends Event
-
case class Received(data: ByteString) extends Event
- case class Connected(localAddress: InetSocketAddress, remoteAddress: InetSocketAddress) extends Event
+ case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event
case class CommandFailed(cmd: Command) extends Event
+ case object Bound extends Event
+ case object Unbound extends Event
- sealed trait Closed extends Event
- case object PeerClosed extends Closed
- case object ActivelyClosed extends Closed
- case object ConfirmedClosed extends Closed
- case class Error(cause: Throwable) extends Closed
+ sealed trait ConnectionClosed extends Event
+ case object Closed extends ConnectionClosed
+ case object Aborted extends ConnectionClosed
+ case object ConfirmedClosed extends ConnectionClosed
+ case object PeerClosed extends ConnectionClosed
+ case class ErrorClose(cause: Throwable) extends ConnectionClosed
/// INTERNAL
case class RegisterClientChannel(channel: SocketChannel)
@@ -208,9 +205,13 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
val SelectorDispatcher = getString("selector-dispatcher")
val WorkerDispatcher = getString("worker-dispatcher")
val ManagementDispatcher = getString("management-dispatcher")
+ val DirectBufferSize = getInt("direct-buffer-size")
+ val RegisterTimeout =
+ if (getString("register-timeout") == "infinite") Duration.Undefined
+ else Duration(getMilliseconds("register-timeout"), MILLISECONDS)
}
val manager = system.asInstanceOf[ActorSystemImpl].systemActorOf(
Props[TcpManager].withDispatcher(Settings.ManagementDispatcher), "IO-TCP")
-}
+}
@@ -0,0 +1,225 @@
+/**
+ * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package akka.io
+
+import java.net.InetSocketAddress
+import java.io.IOException
+import java.nio.channels.SocketChannel
+import scala.util.control.NonFatal
+import scala.collection.immutable
+import scala.concurrent.duration._
+import akka.actor._
+import akka.util.ByteString
+import Tcp._
+
+/**
+ * Base class for TcpIncomingConnection and TcpOutgoingConnection.
+ */
+abstract class TcpConnection(val selector: ActorRef,
+ val channel: SocketChannel) extends Actor with ThreadLocalDirectBuffer with ActorLogging {
+
+ channel.configureBlocking(false)
+
+ var pendingWrite: Write = Write.Empty // a write "queue" of size 1 for holding one unfinished write command
+ def writePending = pendingWrite ne Write.Empty
+
+ def registerTimeout = Tcp(context.system).Settings.RegisterTimeout
+
+ // STATES
+
+ /** connection established, waiting for registration from user handler */
+ def waitingForRegistration(commander: ActorRef): Receive = {
+ case Register(handler)
+ log.debug("{} registered as connection handler", handler)
+ selector ! ReadInterest
+
+ context.setReceiveTimeout(Duration.Undefined)
+ context.watch(handler) // sign death pact
+
+ context.become(connected(handler))
+
+ case cmd: CloseCommand
+ handleClose(commander, closeResponse(cmd))
+
+ case ReceiveTimeout
+ // TODO: just shutting down, as we do here, presents a race condition to the user
+ // Should we introduce a dedicated `Registered` event message to notify the user of successful registration?
+ log.warning("Configured registration timeout of {} expired, stopping", registerTimeout)
+ context.stop(self)
+ }
+
+ /** normal connected state */
+ def connected(handler: ActorRef): Receive = {
+ case StopReading selector ! StopReading
+ case ResumeReading selector ! ReadInterest
+ case ChannelReadable doRead(handler)
+
+ case write: Write if writePending
+ log.debug("Dropping write because queue is full")
+ handler ! CommandFailed(write)
+
+ case write: Write doWrite(handler, write)
+ case ChannelWritable doWrite(handler, pendingWrite)
+
+ case cmd: CloseCommand handleClose(handler, closeResponse(cmd))
+ }
+
+ /** connection is closing but a write has to be finished first */
+ def closingWithPendingWrite(handler: ActorRef, closedEvent: ConnectionClosed): Receive = {
+ case StopReading selector ! StopReading
+ case ResumeReading selector ! ReadInterest
+ case ChannelReadable doRead(handler)
+
+ case ChannelWritable
+ doWrite(handler, pendingWrite)
+ if (!writePending) // writing is now finished
+ handleClose(handler, closedEvent)
+
+ case Abort handleClose(handler, Aborted)
+ }
+
+ /** connection is closed on our side and we're waiting from confirmation from the other side */
+ def closing(handler: ActorRef): Receive = {
+ case StopReading selector ! StopReading
+ case ResumeReading selector ! ReadInterest
+ case ChannelReadable doRead(handler)
+ case Abort handleClose(handler, Aborted)
+ }
+
+ // AUXILIARIES and IMPLEMENTATION
+
+ /** use in subclasses to start the common machinery above once a channel is connected */
+ def completeConnect(commander: ActorRef, options: immutable.Seq[SocketOption]): Unit = {
+ options.foreach(_.afterConnect(channel.socket))
+
+ commander ! Connected(
+ channel.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress],
+ channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])
+
+ context.setReceiveTimeout(registerTimeout)
+ context.become(waitingForRegistration(commander))
+ }
+
+ def doRead(handler: ActorRef): Unit = {
+ val buffer = directBuffer()
+
+ try {
+ log.debug("Trying to read from channel")
+ val readBytes = channel.read(buffer)
+ buffer.flip()
+
+ if (readBytes > 0) {
+ log.debug("Read {} bytes", readBytes)
+ handler ! Received(ByteString(buffer).take(readBytes))
+ if (readBytes == buffer.capacity())
+ // directly try reading more because we exhausted our buffer
+ self ! ChannelReadable
+ else selector ! ReadInterest
+ } else if (readBytes == 0) {
+ log.debug("Read nothing. Registering read interest with selector")
+ selector ! ReadInterest
+ } else if (readBytes == -1) {
+ log.debug("Read returned end-of-stream")
+ doCloseConnection(handler, closeReason)
+ } else throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
+
+ } catch {
+ case e: IOException handleError(handler, e)
+ }
+ }
+
+ def doWrite(handler: ActorRef, write: Write): Unit = {
+ val data = write.data
+
+ val buffer = directBuffer()
+ data.copyToBuffer(buffer)
+ buffer.flip()
+
+ try {
+ log.debug("Trying to write to channel")
+ val writtenBytes = channel.write(buffer)
+ log.debug("Wrote {} bytes", writtenBytes)
+ pendingWrite = consume(write, writtenBytes)
+
+ if (writePending) selector ! WriteInterest // still data to write
+ else if (write.ack != null) handler ! write.ack // everything written
+ } catch {
+ case e: IOException handleError(handler, e)
+ }
+ }
+
+ def closeReason =
+ if (channel.socket.isOutputShutdown) ConfirmedClosed
+ else PeerClosed
+
+ def handleClose(handler: ActorRef, closedEvent: ConnectionClosed): Unit =
+ if (closedEvent == Aborted) { // close instantly
+ log.debug("Got Abort command. RESETing connection.")
+ doCloseConnection(handler, closedEvent)
+
+ } else if (writePending) { // finish writing first
+ log.debug("Got Close command but write is still pending.")
+ context.become(closingWithPendingWrite(handler, closedEvent))
+
+ } else if (closedEvent == ConfirmedClosed) { // shutdown output and wait for confirmation
+ log.debug("Got ConfirmedClose command, sending FIN.")
+ channel.socket.shutdownOutput()
+ context.become(closing(handler))
+
+ } else { // close now
+ log.debug("Got Close command, closing connection.")
+ doCloseConnection(handler, closedEvent)
+ }
+
+ def doCloseConnection(handler: ActorRef, closedEvent: ConnectionClosed): Unit = {
+ if (closedEvent == Aborted) abort()
+ else channel.close()
+
+ handler ! closedEvent
+ context.stop(self)
+ }
+
+ def closeResponse(closeCommand: CloseCommand): ConnectionClosed =
+ closeCommand match {
+ case Close Closed
+ case Abort Aborted
+ case ConfirmedClose ConfirmedClosed
+ }
+
+ def handleError(handler: ActorRef, exception: IOException): Unit = {
+ exception.setStackTrace(Array.empty)
+ handler ! ErrorClose(exception)
+ throw exception
+ }
+
+ def abort(): Unit = {
+ try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST
+ catch {
+ case NonFatal(e)
+ // setSoLinger can fail due to http://bugs.sun.com/view_bug.do?bug_id=6799574
+ // (also affected: OS/X Java 1.6.0_37)
+ log.debug("setSoLinger(true, 0) failed with {}", e)
+ }
+ channel.close()
+ }
+
+ override def postStop(): Unit =
+ if (channel.isOpen)
+ abort()
+
+ /** Returns a new write with `numBytes` removed from the front */
+ def consume(write: Write, numBytes: Int): Write =
+ write match {
+ case Write.Empty if numBytes == 0 write
+ case _
+ numBytes match {
+ case 0 write
+ case x if x == write.data.length Write.Empty
+ case _
+ require(numBytes > 0 && numBytes < write.data.length)
+ write.copy(data = write.data.drop(numBytes))
+ }
+ }
+}
Oops, something went wrong.

0 comments on commit be9abae

Please sign in to comment.