Permalink
Browse files

Merge branch 'master' into wip-2006-binary-compat-√

  • Loading branch information...
2 parents d092d17 + 3757fbd commit 778b945e26be520e6e90d48e93869f454f8aa5b0 @viktorklang viktorklang committed May 16, 2012
@@ -4,7 +4,7 @@
package akka.actor
import akka.dispatch.{ Future, ExecutionContext }
-import akka.util.{ ByteString, Duration }
+import akka.util.{ ByteString, Duration, NonFatal }
import java.net.{ SocketAddress, InetSocketAddress }
import java.io.IOException
import java.nio.ByteBuffer
@@ -114,27 +114,11 @@ object IO {
* @return a new SocketHandle that can be used to perform actions on the
* new connection's SocketChannel.
*/
- def accept(options: Seq[SocketOption])(implicit socketOwner: ActorRef): SocketHandle = {
+ def accept(options: Seq[SocketOption] = Seq.empty)(implicit socketOwner: ActorRef): SocketHandle = {
val socket = SocketHandle(socketOwner, ioManager)
ioManager ! Accept(socket, this, options)
socket
}
-
- /**
- * Sends a request to the [[akka.actor.IOManager]] to accept an incoming
- * connection to the ServerSocketChannel associated with this
- * [[akka.actor.IO.Handle]].
- *
- * This can also be performed by creating a new [[akka.actor.IO.SocketHandle]]
- * and sending it within an [[akka.actor.IO.Accept]] to the [[akka.actor.IOManager]].
- *
- * @param socketOwner the [[akka.actor.ActorRef]] that should receive events
- * associated with the SocketChannel. The ActorRef for the
- * current Actor will be used implicitly.
- * @return a new SocketHandle that can be used to perform actions on the
- * new connection's SocketChannel.
- */
- def accept()(implicit socketOwner: ActorRef): SocketHandle = accept(Seq.empty)
}
/**
@@ -149,38 +133,50 @@ object IO {
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_KEEPALIVE
+ *
+ * For more information see [[java.net.Socket#setKeepAlive]]
*/
case class KeepAlive(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to enable or disable OOBINLINE (receipt
* of TCP urgent data) By default, this option is disabled and TCP urgent
* data received on a [[akka.actor.IO.SocketHandle]] is silently discarded.
+ *
+ * For more information see [[java.net.Socket#setOOBInline]]
*/
case class OOBInline(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to set performance preferences for this
* [[akka.actor.IO.SocketHandle]].
+ *
+ * For more information see [[java.net.Socket#setPerformancePreferences]]
*/
case class PerformancePreferences(connectionTime: Int, latency: Int, bandwidth: Int) extends SocketOption with ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the SO_RCVBUF option for this
* [[akka.actor.IO.SocketHandle]].
+ *
+ * For more information see [[java.net.Socket#setReceiveBufferSize]]
*/
case class ReceiveBufferSize(size: Int) extends SocketOption with ServerSocketOption {
require(size > 0, "Receive buffer size must be greater than 0")
}
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_REUSEADDR
+ *
+ * For more information see [[java.net.Socket#setReuseAddress]]
*/
case class ReuseAddress(on: Boolean) extends SocketOption with ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the SO_SNDBUF option for this
* [[akka.actor.IO.SocketHandle]].
+ *
+ * For more information see [[java.net.Socket#setSendBufferSize]]
*/
case class SendBufferSize(size: Int) extends SocketOption {
require(size > 0, "Send buffer size must be greater than 0")
@@ -189,14 +185,19 @@ object IO {
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_LINGER with the
* specified linger time in seconds.
+ *
+ * For more information see [[java.net.Socket#setSoLinger]]
*/
case class SoLinger(linger: Option[Int]) extends SocketOption {
if (linger.isDefined) require(linger.get >= 0, "linger must not be negative if on")
}
/**
- * [[akka.actor.IO.SocketOption]] to enable or disable SO_TIMEOUT with the
- * specified timeout rounded down to the nearest millisecond.
+ * [[akka.actor.IO.SocketOption]] to set SO_TIMEOUT to the specified
+ * timeout rounded down to the nearest millisecond. A timeout of
+ * zero is treated as infinant.
+ *
+ * For more information see [[java.net.Socket#setSoTimeout]]
*/
case class SoTimeout(timeout: Duration) extends SocketOption {
require(timeout.toMillis >= 0, "SoTimeout must be >= 0ms")
@@ -205,15 +206,22 @@ object IO {
/**
* [[akka.actor.IO.SocketOption]] to enable or disable TCP_NODELAY
* (disable or enable Nagle's algorithm)
+ *
+ * For more information see [[java.net.Socket#setTcpNoDelay]]
*/
case class TcpNoDelay(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the traffic class or
* type-of-service octet in the IP header for packets sent from this
* [[akka.actor.IO.SocketHandle]].
+ *
+ * For more information see [[java.net.Socket#setTrafficClass]]
*/
- case class TrafficClass(tc: Int) extends SocketOption
+ case class TrafficClass(tc: Int) extends SocketOption {
+ require(tc >= 0, "Traffic class must be >= 0")
+ require(tc <= 255, "Traffic class must be <= 255")
+ }
/**
* Messages used to communicate with an [[akka.actor.IOManager]].
@@ -818,23 +826,10 @@ final class IOManager private (system: ActorSystem) extends Extension {
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/
- def listen(host: String, port: Int, options: Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle =
+ def listen(host: String, port: Int, options: Seq[IO.ServerSocketOption] = Seq.empty)(implicit owner: ActorRef): IO.ServerHandle =
listen(new InetSocketAddress(host, port), options)(owner)
/**
- * Create a ServerSocketChannel listening on a host and port. Messages will
- * be sent from the [[akka.actor.IOManagerActor]] to the owner
- * [[akka.actor.ActorRef]].
- *
- * @param host the hostname or IP to listen on
- * @param port the port to listen on
- * @param options Seq of [[akka.actor.IO.ServerSocketOption]] to setup on socket
- * @param owner the ActorRef that will receive messages from the IOManagerActor
- * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
- */
- def listen(host: String, port: Int)(implicit owner: ActorRef): IO.ServerHandle = listen(new InetSocketAddress(host, port), Seq.empty)(owner)
-
- /**
* Create a SocketChannel connecting to an address. Messages will be
* sent from the [[akka.actor.IOManagerActor]] to the owner
* [[akka.actor.ActorRef]].
@@ -844,24 +839,13 @@ final class IOManager private (system: ActorSystem) extends Extension {
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket
*/
- def connect(address: SocketAddress, options: Seq[IO.SocketOption])(implicit owner: ActorRef): IO.SocketHandle = {
+ def connect(address: SocketAddress, options: Seq[IO.SocketOption] = Seq.empty)(implicit owner: ActorRef): IO.SocketHandle = {
val socket = IO.SocketHandle(owner, actor)
actor ! IO.Connect(socket, address, options)
socket
}
/**
- * Create a SocketChannel connecting to an address. Messages will be
- * sent from the [[akka.actor.IOManagerActor]] to the owner
- * [[akka.actor.ActorRef]].
- *
- * @param address the address to connect to
- * @param owner the ActorRef that will receive messages from the IOManagerActor
- * @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket
- */
- def connect(address: SocketAddress)(implicit owner: ActorRef): IO.SocketHandle = connect(address, Seq.empty)
-
- /**
* Create a SocketChannel connecting to a host and port. Messages will
* be sent from the [[akka.actor.IOManagerActor]] to the owner
* [[akka.actor.ActorRef]].
@@ -963,19 +947,23 @@ final class IOManagerActor extends Actor with ActorLogging {
lastSelect = 0
}
+ private def forwardFailure(f: Unit): Unit = {
+ try { f } catch { case NonFatal(e) sender ! Status.Failure(e) }
+ }
+
private def setSocketOptions(socket: java.net.Socket, options: Seq[IO.SocketOption]) {
options foreach {
- case IO.KeepAlive(on) socket.setKeepAlive(on)
- case IO.OOBInline(on) socket.setOOBInline(on)
- case IO.ReceiveBufferSize(size) socket.setReceiveBufferSize(size)
- case IO.ReuseAddress(on) socket.setReuseAddress(on)
- case IO.SendBufferSize(size) socket.setSendBufferSize(size)
- case IO.SoLinger(linger) socket.setSoLinger(linger.isDefined, math.max(0, linger.getOrElse(socket.getSoLinger)))
- case IO.SoTimeout(timeout) socket.setSoTimeout(timeout.toMillis.toInt)
- case IO.TcpNoDelay(on) socket.setTcpNoDelay(on)
- case IO.TrafficClass(tc) socket.setTrafficClass(tc)
+ case IO.KeepAlive(on) forwardFailure(socket.setKeepAlive(on))
+ case IO.OOBInline(on) forwardFailure(socket.setOOBInline(on))
+ case IO.ReceiveBufferSize(size) forwardFailure(socket.setReceiveBufferSize(size))
+ case IO.ReuseAddress(on) forwardFailure(socket.setReuseAddress(on))
+ case IO.SendBufferSize(size) forwardFailure(socket.setSendBufferSize(size))
+ case IO.SoLinger(linger) forwardFailure(socket.setSoLinger(linger.isDefined, math.max(0, linger.getOrElse(socket.getSoLinger))))
+ case IO.SoTimeout(timeout) forwardFailure(socket.setSoTimeout(timeout.toMillis.toInt))
+ case IO.TcpNoDelay(on) forwardFailure(socket.setTcpNoDelay(on))
+ case IO.TrafficClass(tc) forwardFailure(socket.setTrafficClass(tc))
case IO.PerformancePreferences(connTime, latency, bandwidth)
- socket.setPerformancePreferences(connTime, latency, bandwidth)
+ forwardFailure(socket.setPerformancePreferences(connTime, latency, bandwidth))
}
}
@@ -991,10 +979,10 @@ final class IOManagerActor extends Actor with ActorLogging {
val sock = channel.socket
options foreach {
- case IO.ReceiveBufferSize(size) sock.setReceiveBufferSize(size)
- case IO.ReuseAddress(on) sock.setReuseAddress(on)
+ case IO.ReceiveBufferSize(size) forwardFailure(sock.setReceiveBufferSize(size))
+ case IO.ReuseAddress(on) forwardFailure(sock.setReuseAddress(on))
case IO.PerformancePreferences(connTime, latency, bandwidth)
- sock.setPerformancePreferences(connTime, latency, bandwidth)
+ forwardFailure(sock.setPerformancePreferences(connTime, latency, bandwidth))
}
channel.socket bind (address, 1000) // TODO: make backlog configurable
@@ -12,6 +12,7 @@ import atomic.AtomicInteger
import scala.annotation.tailrec
import akka.actor.ActorSystem
import akka.util.Timeout
+import akka.util.BoxedType
object TestActor {
type Ignore = Option[PartialFunction[AnyRef, Boolean]]
@@ -347,7 +348,7 @@ class TestKit(_system: ActorSystem) {
private def expectMsgClass_internal[C](max: Duration, c: Class[C]): C = {
val o = receiveOne(max)
assert(o ne null, "timeout (" + max + ") during expectMsgClass waiting for " + c)
- assert(c isInstance o, "expected " + c + ", found " + o.getClass)
+ assert(BoxedType(c) isInstance o, "expected " + c + ", found " + o.getClass)
o.asInstanceOf[C]
}
@@ -389,7 +390,7 @@ class TestKit(_system: ActorSystem) {
private def expectMsgAnyClassOf_internal[C](max: Duration, obj: Class[_ <: C]*): C = {
val o = receiveOne(max)
assert(o ne null, "timeout (" + max + ") during expectMsgAnyClassOf waiting for " + obj.mkString("(", ", ", ")"))
- assert(obj exists (_ isInstance o), "found unexpected " + o)
+ assert(obj exists (c BoxedType(c) isInstance o), "found unexpected " + o)
o.asInstanceOf[C]
}
@@ -437,8 +438,8 @@ class TestKit(_system: ActorSystem) {
private def expectMsgAllClassOf_internal[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = {
val recv = receiveN_internal(obj.size, max)
- obj foreach (x assert(recv exists (_.getClass eq x), "not found " + x))
- recv foreach (x assert(obj exists (_ eq x.getClass), "found non-matching object " + x))
+ obj foreach (x assert(recv exists (_.getClass eq BoxedType(x)), "not found " + x))
+ recv foreach (x assert(obj exists (c BoxedType(c) eq x.getClass), "found non-matching object " + x))
recv.asInstanceOf[Seq[T]]
}
@@ -462,8 +463,8 @@ class TestKit(_system: ActorSystem) {
private def expectMsgAllConformingOf_internal[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = {
val recv = receiveN_internal(obj.size, max)
- obj foreach (x assert(recv exists (x isInstance _), "not found " + x))
- recv foreach (x assert(obj exists (_ isInstance x), "found non-matching object " + x))
+ obj foreach (x assert(recv exists (BoxedType(x) isInstance _), "not found " + x))
+ recv foreach (x assert(obj exists (c BoxedType(c) isInstance x), "found non-matching object " + x))
recv.asInstanceOf[Seq[T]]
}
@@ -65,6 +65,14 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout {
expectMsg("end") // verify that "hallo" did not get through
}
+ "be able to expect primitive types" in {
+ for (_ 1 to 4) testActor ! 42
+ expectMsgType[Int] must be(42)
+ expectMsgAnyClassOf(classOf[Int]) must be(42)
+ expectMsgAllClassOf(classOf[Int]) must be(Seq(42))
+ expectMsgAllConformingOf(classOf[Int]) must be(Seq(42))
+ }
+
}
}

0 comments on commit 778b945

Please sign in to comment.