Permalink
Browse files

Merge pull request #454 from srp/master

assorted IO.SocketOption changes from last pull request comments
  • Loading branch information...
2 parents 2e248e4 + 39a36b7 commit 4794220e557d25bc45a3f756380b80becf334f57 @viktorklang viktorklang committed May 15, 2012
Showing with 48 additions and 60 deletions.
  1. +48 −60 akka-actor/src/main/scala/akka/actor/IO.scala
@@ -4,7 +4,7 @@
package akka.actor
import akka.dispatch.{ Future, ExecutionContext }
-import akka.util.{ ByteString, Duration }
+import akka.util.{ ByteString, Duration, NonFatal }
import java.net.{ SocketAddress, InetSocketAddress }
import java.io.IOException
import java.nio.ByteBuffer
@@ -115,27 +115,11 @@ object IO {
* @return a new SocketHandle that can be used to perform actions on the
* new connection's SocketChannel.
*/
- def accept(options: Seq[SocketOption])(implicit socketOwner: ActorRef): SocketHandle = {
+ def accept(options: Seq[SocketOption] = Seq.empty)(implicit socketOwner: ActorRef): SocketHandle = {
val socket = SocketHandle(socketOwner, ioManager)
ioManager ! Accept(socket, this, options)
socket
}
-
- /**
- * Sends a request to the [[akka.actor.IOManager]] to accept an incoming
- * connection to the ServerSocketChannel associated with this
- * [[akka.actor.IO.Handle]].
- *
- * This can also be performed by creating a new [[akka.actor.IO.SocketHandle]]
- * and sending it within an [[akka.actor.IO.Accept]] to the [[akka.actor.IOManager]].
- *
- * @param socketOwner the [[akka.actor.ActorRef]] that should receive events
- * associated with the SocketChannel. The ActorRef for the
- * current Actor will be used implicitly.
- * @return a new SocketHandle that can be used to perform actions on the
- * new connection's SocketChannel.
- */
- def accept()(implicit socketOwner: ActorRef): SocketHandle = accept(Seq.empty)
}
/**
@@ -150,38 +134,50 @@ object IO {
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_KEEPALIVE
+ *
+ * For more information see [[java.net.Socket#setKeepAlive]]
*/
case class KeepAlive(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to enable or disable OOBINLINE (receipt
* of TCP urgent data) By default, this option is disabled and TCP urgent
* data received on a [[akka.actor.IO.SocketHandle]] is silently discarded.
+ *
+ * For more information see [[java.net.Socket#setOOBInline]]
*/
case class OOBInline(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to set performance preferences for this
* [[akka.actor.IO.SocketHandle]].
+ *
+ * For more information see [[java.net.Socket#setPerformancePreferences]]
*/
case class PerformancePreferences(connectionTime: Int, latency: Int, bandwidth: Int) extends SocketOption with ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the SO_RCVBUF option for this
* [[akka.actor.IO.SocketHandle]].
+ *
+ * For more information see [[java.net.Socket#setReceiveBufferSize]]
*/
case class ReceiveBufferSize(size: Int) extends SocketOption with ServerSocketOption {
require(size > 0, "Receive buffer size must be greater than 0")
}
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_REUSEADDR
+ *
+ * For more information see [[java.net.Socket#setReuseAddress]]
*/
case class ReuseAddress(on: Boolean) extends SocketOption with ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the SO_SNDBUF option for this
* [[akka.actor.IO.SocketHandle]].
+ *
+ * For more information see [[java.net.Socket#setSendBufferSize]]
*/
case class SendBufferSize(size: Int) extends SocketOption {
require(size > 0, "Send buffer size must be greater than 0")
@@ -190,14 +186,19 @@ object IO {
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_LINGER with the
* specified linger time in seconds.
+ *
+ * For more information see [[java.net.Socket#setSoLinger]]
*/
case class SoLinger(linger: Option[Int]) extends SocketOption {
if (linger.isDefined) require(linger.get >= 0, "linger must not be negative if on")
}
/**
- * [[akka.actor.IO.SocketOption]] to enable or disable SO_TIMEOUT with the
- * specified timeout rounded down to the nearest millisecond.
+ * [[akka.actor.IO.SocketOption]] to set SO_TIMEOUT to the specified
+ * timeout rounded down to the nearest millisecond. A timeout of
+ * zero is treated as infinant.
+ *
+ * For more information see [[java.net.Socket#setSoTimeout]]
*/
case class SoTimeout(timeout: Duration) extends SocketOption {
require(timeout.toMillis >= 0, "SoTimeout must be >= 0ms")
@@ -206,15 +207,22 @@ object IO {
/**
* [[akka.actor.IO.SocketOption]] to enable or disable TCP_NODELAY
* (disable or enable Nagle's algorithm)
+ *
+ * For more information see [[java.net.Socket#setTcpNoDelay]]
*/
case class TcpNoDelay(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the traffic class or
* type-of-service octet in the IP header for packets sent from this
* [[akka.actor.IO.SocketHandle]].
+ *
+ * For more information see [[java.net.Socket#setTrafficClass]]
*/
- case class TrafficClass(tc: Int) extends SocketOption
+ case class TrafficClass(tc: Int) extends SocketOption {
+ require(tc >= 0, "Traffic class must be >= 0")
+ require(tc <= 255, "Traffic class must be <= 255")
+ }
/**
* Messages used to communicate with an [[akka.actor.IOManager]].
@@ -819,23 +827,10 @@ final class IOManager private (system: ActorSystem) extends Extension {
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/
- def listen(host: String, port: Int, options: Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle =
+ def listen(host: String, port: Int, options: Seq[IO.ServerSocketOption] = Seq.empty)(implicit owner: ActorRef): IO.ServerHandle =
listen(new InetSocketAddress(host, port), options)(owner)
/**
- * Create a ServerSocketChannel listening on a host and port. Messages will
- * be sent from the [[akka.actor.IOManagerActor]] to the owner
- * [[akka.actor.ActorRef]].
- *
- * @param host the hostname or IP to listen on
- * @param port the port to listen on
- * @param options Seq of [[akka.actor.IO.ServerSocketOption]] to setup on socket
- * @param owner the ActorRef that will receive messages from the IOManagerActor
- * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
- */
- def listen(host: String, port: Int)(implicit owner: ActorRef): IO.ServerHandle = listen(new InetSocketAddress(host, port), Seq.empty)(owner)
-
- /**
* Create a SocketChannel connecting to an address. Messages will be
* sent from the [[akka.actor.IOManagerActor]] to the owner
* [[akka.actor.ActorRef]].
@@ -845,24 +840,13 @@ final class IOManager private (system: ActorSystem) extends Extension {
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket
*/
- def connect(address: SocketAddress, options: Seq[IO.SocketOption])(implicit owner: ActorRef): IO.SocketHandle = {
+ def connect(address: SocketAddress, options: Seq[IO.SocketOption] = Seq.empty)(implicit owner: ActorRef): IO.SocketHandle = {
val socket = IO.SocketHandle(owner, actor)
actor ! IO.Connect(socket, address, options)
socket
}
/**
- * Create a SocketChannel connecting to an address. Messages will be
- * sent from the [[akka.actor.IOManagerActor]] to the owner
- * [[akka.actor.ActorRef]].
- *
- * @param address the address to connect to
- * @param owner the ActorRef that will receive messages from the IOManagerActor
- * @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket
- */
- def connect(address: SocketAddress)(implicit owner: ActorRef): IO.SocketHandle = connect(address, Seq.empty)
-
- /**
* Create a SocketChannel connecting to a host and port. Messages will
* be sent from the [[akka.actor.IOManagerActor]] to the owner
* [[akka.actor.ActorRef]].
@@ -964,19 +948,23 @@ final class IOManagerActor extends Actor with ActorLogging {
lastSelect = 0
}
+ private def forwardFailure(f: Unit): Unit = {
+ try { f } catch { case NonFatal(e) sender ! Status.Failure(e) }
+ }
+
private def setSocketOptions(socket: java.net.Socket, options: Seq[IO.SocketOption]) {
options foreach {
- case IO.KeepAlive(on) socket.setKeepAlive(on)
- case IO.OOBInline(on) socket.setOOBInline(on)
- case IO.ReceiveBufferSize(size) socket.setReceiveBufferSize(size)
- case IO.ReuseAddress(on) socket.setReuseAddress(on)
- case IO.SendBufferSize(size) socket.setSendBufferSize(size)
- case IO.SoLinger(linger) socket.setSoLinger(linger.isDefined, math.max(0, linger.getOrElse(socket.getSoLinger)))
- case IO.SoTimeout(timeout) socket.setSoTimeout(timeout.toMillis.toInt)
- case IO.TcpNoDelay(on) socket.setTcpNoDelay(on)
- case IO.TrafficClass(tc) socket.setTrafficClass(tc)
+ case IO.KeepAlive(on) forwardFailure(socket.setKeepAlive(on))
+ case IO.OOBInline(on) forwardFailure(socket.setOOBInline(on))
+ case IO.ReceiveBufferSize(size) forwardFailure(socket.setReceiveBufferSize(size))
+ case IO.ReuseAddress(on) forwardFailure(socket.setReuseAddress(on))
+ case IO.SendBufferSize(size) forwardFailure(socket.setSendBufferSize(size))
+ case IO.SoLinger(linger) forwardFailure(socket.setSoLinger(linger.isDefined, math.max(0, linger.getOrElse(socket.getSoLinger))))
+ case IO.SoTimeout(timeout) forwardFailure(socket.setSoTimeout(timeout.toMillis.toInt))
+ case IO.TcpNoDelay(on) forwardFailure(socket.setTcpNoDelay(on))
+ case IO.TrafficClass(tc) forwardFailure(socket.setTrafficClass(tc))
case IO.PerformancePreferences(connTime, latency, bandwidth)
- socket.setPerformancePreferences(connTime, latency, bandwidth)
+ forwardFailure(socket.setPerformancePreferences(connTime, latency, bandwidth))
}
}
@@ -992,10 +980,10 @@ final class IOManagerActor extends Actor with ActorLogging {
val sock = channel.socket
options foreach {
- case IO.ReceiveBufferSize(size) sock.setReceiveBufferSize(size)
- case IO.ReuseAddress(on) sock.setReuseAddress(on)
+ case IO.ReceiveBufferSize(size) forwardFailure(sock.setReceiveBufferSize(size))
+ case IO.ReuseAddress(on) forwardFailure(sock.setReuseAddress(on))
case IO.PerformancePreferences(connTime, latency, bandwidth)
- sock.setPerformancePreferences(connTime, latency, bandwidth)
+ forwardFailure(sock.setPerformancePreferences(connTime, latency, bandwidth))
}
channel.socket bind (address, 1000) // TODO: make backlog configurable

0 comments on commit 4794220

Please sign in to comment.