Permalink
Browse files

cleanup transport typing

no more pattern matching at runtime
  • Loading branch information...
benmur committed Jan 5, 2013
1 parent 5e3e608 commit 3f13152eb69c86165e24097bc42a89c21552494f
@@ -12,7 +12,7 @@ trait DestinationOps {
new RiemannDestination[T](EventPart(), connectionBuilder.buildConnection(where))
}
- class RiemannDestination[T <: TransportType](baseEvent: EventPart, val connection: Connection[T])(implicit system: ActorSystem, timeout: Timeout)
+ class RiemannDestination[T <: TransportType](baseEvent: EventPart, val connection: T#Connection)(implicit timeout: Timeout)
extends Destination[T] {
def send(event: EventPart)(implicit messenger: SendOff[EventPart, T]): Unit =
@@ -2,13 +2,12 @@ package net.benmur.riemann.client
import java.io.{ InputStream, OutputStream }
import java.net.SocketAddress
-
import scala.annotation.implicitNotFound
import scala.collection.mutable.WrappedArray
-
import akka.actor.ActorSystem
import akka.dispatch.Future
import akka.util.Timeout
+import akka.actor.ActorRef
sealed trait RiemannSendable
@@ -33,27 +32,36 @@ case class WriteBinary(data: Array[Byte])
case class RemoteError(message: String) extends Throwable
trait TransportType {
+ type Connection
type SocketWrapper
type SocketFactory = SocketAddress => SocketWrapper
}
-trait Reliable extends TransportType {
+object Reliable extends TransportType {
type SocketWrapper = ConnectedSocketWrapper
-}
+ type Connection = TcpActorConnectionHandle
-trait Unreliable extends TransportType {
- type SocketWrapper = UnconnectedSocketWrapper
+ trait ConnectedSocketWrapper {
+ def inputStream: InputStream
+ def outputStream: OutputStream
+ }
+
+ trait TcpActorConnectionHandle {
+ val ioActor: ActorRef
+ }
}
-trait Connection[T <: TransportType]
+object Unreliable extends TransportType {
+ type SocketWrapper = UnconnectedSocketWrapper
+ type Connection = UdpActorConnectionHandle
-trait ConnectedSocketWrapper {
- def inputStream: InputStream
- def outputStream: OutputStream
-}
+ trait UnconnectedSocketWrapper {
+ def send(data: WrappedArray[Byte]): Unit
+ }
-trait UnconnectedSocketWrapper {
- def send(data: WrappedArray[Byte]): Unit
+ trait UdpActorConnectionHandle {
+ val ioActor: ActorRef
+ }
}
trait Destination[T <: TransportType] {
@@ -67,15 +75,15 @@ trait Destination[T <: TransportType] {
@implicitNotFound(msg = "No way of building a connection to Riemann of type ${T}.")
trait ConnectionBuilder[T <: TransportType] {
- def buildConnection(where: SocketAddress, factory: Option[T#SocketFactory] = None, dispatcherId: Option[String] = None)(implicit system: ActorSystem, timeout: Timeout): Connection[T]
+ def buildConnection(where: SocketAddress, factory: Option[T#SocketFactory] = None, dispatcherId: Option[String] = None)(implicit system: ActorSystem, timeout: Timeout): T#Connection
}
@implicitNotFound(msg = "Connection type ${T} does not allow sending to ${S} Riemann because there is no implicit in scope returning a implementation of SendOff[${S}, ${T}].")
trait SendOff[S <: RiemannSendable, T <: TransportType] {
- def sendOff(connection: Connection[T], command: Write[S]): Unit
+ def sendOff(connection: T#Connection, command: Write[S]): Unit
}
@implicitNotFound(msg = "Connection type ${T} does not allow getting feedback from Riemann after sending ${S} because there is no implicit in scope returning a implementation of SendAndExpectFeedback[${S}, ${T}].")
trait SendAndExpectFeedback[S <: RiemannSendable, R, T <: TransportType] {
- def send(connection: Connection[T], command: Write[S])(implicit system: ActorSystem, timeout: Timeout): Future[R]
+ def send(connection: T#Connection, command: Write[S])(implicit timeout: Timeout): Future[R]
}
@@ -13,8 +13,11 @@ import akka.util.duration.intToDurationInt
import akka.actor.ActorInitializationException
trait ReliableIO {
- private[this] class ReliableConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory, dispatcherId: Option[String])(implicit system: ActorSystem) extends Actor {
- override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second) { // This needs to be more reasonable
+ private type ImplementedTransport = Reliable.type
+ type Reliable = ImplementedTransport
+
+ private[this] class ReliableConnectionActor(where: SocketAddress, factory: ImplementedTransport#SocketFactory, dispatcherId: Option[String])(implicit system: ActorSystem) extends Actor {
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second) { // Maybe this needs to be configurable
case _ => Restart
}
@@ -30,32 +33,22 @@ trait ReliableIO {
}
}
- implicit object ReliableEventPartSendAndExpectFeedback extends SendAndExpectFeedback[EventPart, Boolean, Reliable] with Serializers {
- def send(connection: Connection[Reliable], command: Write[EventPart])(implicit system: ActorSystem, timeout: Timeout): Future[Boolean] =
- connection match {
- case rc: ReliableConnection =>
- val data = serializeEventPartToProtoMsg(command.m).toByteArray
- (rc.ioActor ask WriteBinary(data)).mapTo[Proto.Msg] map (_.getOk)
-
- case c =>
- Promise.failed(RemoteError("don't know how to send data to " + c.getClass.getName))
- }
+ implicit object ReliableEventPartSendAndExpectFeedback extends SendAndExpectFeedback[EventPart, Boolean, ImplementedTransport] with Serializers {
+ def send(connection: ImplementedTransport#Connection, command: Write[EventPart])(implicit timeout: Timeout): Future[Boolean] = {
+ val data = serializeEventPartToProtoMsg(command.m).toByteArray
+ (connection.ioActor ask WriteBinary(data)).mapTo[Proto.Msg] map (_.getOk)
+ }
}
- implicit object ReliableQuerySendAndExpectFeedback extends SendAndExpectFeedback[Query, Iterable[EventPart], Reliable] with Serializers {
- def send(connection: Connection[Reliable], command: Write[Query])(implicit system: ActorSystem, timeout: Timeout): Future[Iterable[EventPart]] =
- connection match {
- case rc: ReliableConnection =>
- val data = serializeQueryToProtoMsg(command.m).toByteArray
- (rc.ioActor ask WriteBinary(data)).mapTo[Proto.Msg] map (unserializeProtoMsg(_))
-
- case c =>
- Promise.failed(RemoteError("don't know how to send data to " + c.getClass.getName))
- }
+ implicit object ReliableQuerySendAndExpectFeedback extends SendAndExpectFeedback[Query, Iterable[EventPart], ImplementedTransport] with Serializers {
+ def send(connection: ImplementedTransport#Connection, command: Write[Query])(implicit timeout: Timeout): Future[Iterable[EventPart]] = {
+ val data = serializeQueryToProtoMsg(command.m).toByteArray
+ (connection.ioActor ask WriteBinary(data)).mapTo[Proto.Msg] map (unserializeProtoMsg(_))
+ }
}
- implicit object ReliableSendOff extends SendOff[EventPart, Reliable] with Serializers {
- def sendOff(connection: Connection[Reliable], command: Write[EventPart]): Unit = connection match {
+ implicit object ReliableSendOff extends SendOff[EventPart, ImplementedTransport] with Serializers {
+ def sendOff(connection: ImplementedTransport#Connection, command: Write[EventPart]): Unit = connection match {
case rc: ReliableConnection =>
rc.ioActor tell WriteBinary(serializeEventPartToProtoMsg(command.m).toByteArray)
case c =>
@@ -64,7 +57,7 @@ trait ReliableIO {
}
}
- class TcpConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory) extends Actor with ActorLogging {
+ class TcpConnectionActor(where: SocketAddress, factory: ImplementedTransport#SocketFactory) extends Actor with ActorLogging {
lazy val connection = factory(where)
lazy val outputStream = new DataOutputStream(connection.outputStream)
lazy val inputStream = new DataInputStream(connection.inputStream)
@@ -88,19 +81,19 @@ trait ReliableIO {
}
}
- val makeTcpConnection: Reliable#SocketFactory = (addr) => {
+ val makeTcpConnection: ImplementedTransport#SocketFactory = (addr) => {
val socket = new Socket()
socket.connect(addr)
- new ConnectedSocketWrapper {
+ new ImplementedTransport#SocketWrapper {
override def outputStream = socket.getOutputStream()
override def inputStream = socket.getInputStream()
}
}
- class ReliableConnection(val ioActor: ActorRef) extends Connection[Reliable]
+ class ReliableConnection(val ioActor: ActorRef) extends ImplementedTransport#Connection
- implicit object TwoWayConnectionBuilder extends ConnectionBuilder[Reliable] {
- def buildConnection(where: SocketAddress, factory: Option[Reliable#SocketFactory] = None, dispatcherId: Option[String])(implicit system: ActorSystem, timeout: Timeout): Connection[Reliable] = {
+ implicit object TwoWayConnectionBuilder extends ConnectionBuilder[ImplementedTransport] {
+ def buildConnection(where: SocketAddress, factory: Option[ImplementedTransport#SocketFactory] = None, dispatcherId: Option[String])(implicit system: ActorSystem, timeout: Timeout): ImplementedTransport#Connection = {
val props = {
val p = Props(new ReliableConnectionActor(where, factory getOrElse makeTcpConnection, dispatcherId))
if (dispatcherId.isEmpty) p else p.withDispatcher(dispatcherId.get)
@@ -2,46 +2,47 @@ package net.benmur.riemann.client
import java.net.{ DatagramPacket, DatagramSocket, SocketAddress }
-import scala.annotation.implicitNotFound
import scala.collection.mutable.WrappedArray
import akka.actor.{ Actor, ActorSystem, Props }
import akka.util.Timeout
trait UnreliableIO {
- class UnreliableConnection(where: SocketAddress, factory: Unreliable#SocketFactory, dispatcherId: Option[String] = None)(implicit system: ActorSystem) extends Connection[Unreliable] {
+ private type ImplementedTransport = Unreliable.type
+ type Unreliable = ImplementedTransport
+
+ class UnreliableConnection(where: SocketAddress, factory: ImplementedTransport#SocketFactory, dispatcherId: Option[String] = None)(implicit system: ActorSystem)
+ extends ImplementedTransport#Connection {
val props = {
val p = Props(new UnconnectedConnectionActor(where, factory))
if (dispatcherId.isEmpty) p else p.withDispatcher(dispatcherId.get)
}
val ioActor = system.actorOf(props, io.IO.clientName("riemann-udp-client-"))
}
- implicit object UnreliableSendOff extends SendOff[EventPart, Unreliable] with Serializers {
- def sendOff(connection: Connection[Unreliable], command: Write[EventPart]): Unit = connection match {
- case uc: UnreliableConnection =>
- val data = serializeEventPartToProtoMsg(command.m).toByteArray
- uc.ioActor tell WriteBinary(data)
- case c => System.err.println("don't know how to send data to " + c.getClass.getName)
+ implicit object UnreliableSendOff extends SendOff[EventPart, ImplementedTransport] with Serializers {
+ def sendOff(connection: ImplementedTransport#Connection, command: Write[EventPart]): Unit = {
+ val data = serializeEventPartToProtoMsg(command.m).toByteArray
+ connection.ioActor tell WriteBinary(data)
}
}
- private[this] class UnconnectedConnectionActor(where: SocketAddress, factory: Unreliable#SocketFactory) extends Actor {
+ private[this] class UnconnectedConnectionActor(where: SocketAddress, factory: ImplementedTransport#SocketFactory) extends Actor {
val connection = factory(where)
def receive = {
case WriteBinary(data) => connection send data
}
}
- val makeUdpConnection: Unreliable#SocketFactory = (addr: SocketAddress) => {
- val dest = new DatagramSocket(addr)
- new UnconnectedSocketWrapper {
+ val makeUdpConnection: ImplementedTransport#SocketFactory = (addr: SocketAddress) => {
+ new ImplementedTransport#SocketWrapper {
+ val dest = new DatagramSocket(addr)
override def send(data: WrappedArray[Byte]) = dest send new DatagramPacket(data.array, data.length)
}
}
- implicit object OneWayConnectionBuilder extends ConnectionBuilder[Unreliable] {
- implicit def buildConnection(where: SocketAddress, factory: Option[Unreliable#SocketFactory], dispatcherId: Option[String])(implicit system: ActorSystem, timeout: Timeout): Connection[Unreliable] =
+ implicit object OneWayConnectionBuilder extends ConnectionBuilder[ImplementedTransport] {
+ implicit def buildConnection(where: SocketAddress, factory: Option[ImplementedTransport#SocketFactory], dispatcherId: Option[String])(implicit system: ActorSystem, timeout: Timeout): ImplementedTransport#Connection =
new UnreliableConnection(where, factory getOrElse makeUdpConnection, dispatcherId)
}
}
@@ -31,7 +31,7 @@ class EventSenderDSLTest extends FunSuite
test("DSL operator to send operator to send an event expecting a status") {
val (conn, dest) = makeDestination
implicit val sender = mock[SendAndExpectFeedback[EventPart, Boolean, TestingTransport]]
- sender expects 'send withArguments (conn, Write(event), system, timeout) once
+ sender expects 'send withArguments (conn, Write(event), timeout) once
event |>< dest
}
@@ -41,7 +41,7 @@ class EventSenderDSLTest extends FunSuite
implicit val sender = mock[SendAndExpectFeedback[Query, Iterable[EventPart], TestingTransport]]
val q = Query("true")
- sender expects 'send withArguments (conn, Write(q), system, timeout) once
+ sender expects 'send withArguments (conn, Write(q), timeout) once
q |>< dest
}
@@ -28,11 +28,11 @@ class ReliableIOTest extends FunSuite
val ios = new ByteArrayInputStream(in)
val oos = new ByteArrayOutputStream()
- val wrapper = mock[ConnectedSocketWrapper]
+ val wrapper = mock[Reliable.SocketWrapper]
wrapper expects 'outputStream returning oos once;
wrapper expects 'inputStream returning ios once
- val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper]
+ val socketFactory = mockFunction[SocketAddress, Reliable.SocketWrapper]
socketFactory expects address returning wrapper once
val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
@@ -55,11 +55,11 @@ class ReliableIOTest extends FunSuite
val oos = new ByteArrayOutputStream()
- val wrapper = mock[ConnectedSocketWrapper]
+ val wrapper = mock[Reliable.SocketWrapper]
wrapper expects 'outputStream returning oos once;
wrapper expects 'inputStream returning new ByteArrayInputStream(outBuilder.toByteArray) once
- val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper]
+ val socketFactory = mockFunction[SocketAddress, Reliable.SocketWrapper]
socketFactory expects address returning wrapper once
val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
@@ -85,11 +85,11 @@ class ReliableIOTest extends FunSuite
val oos = new ByteArrayOutputStream()
- val wrapper = mock[ConnectedSocketWrapper]
+ val wrapper = mock[Reliable.SocketWrapper]
wrapper expects 'outputStream returning oos once;
wrapper expects 'inputStream returning new ByteArrayInputStream(responseBuilder.toByteArray) once
- val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper]
+ val socketFactory = mockFunction[SocketAddress, Reliable.SocketWrapper]
socketFactory expects address returning wrapper once
val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
@@ -111,11 +111,11 @@ class ReliableIOTest extends FunSuite
val os = new ByteArrayOutputStream
- val wrapper = mock[ConnectedSocketWrapper]
+ val wrapper = mock[Reliable.SocketWrapper]
wrapper expects 'inputStream returning inputStream twice;
wrapper expects 'outputStream returning os twice
- val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper]
+ val socketFactory = mockFunction[SocketAddress, Reliable.SocketWrapper]
socketFactory expects address returning wrapper twice
val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
@@ -143,8 +143,8 @@ class ReliableIOTest extends FunSuite
}
test("reconnect in case of SocketException while connecting") {
- val wrapper = mock[ConnectedSocketWrapper]
- val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper]
+ val wrapper = mock[Reliable.SocketWrapper]
+ val socketFactory = mockFunction[SocketAddress, Reliable.SocketWrapper]
val os = new ByteArrayOutputStream
@@ -40,23 +40,23 @@ class RiemannClientWithDestinationAPITest extends FunSuite
test("entry point to create a connection (sending an event expecting feedback)") {
val dest = riemannConnectAs[TestingTransport] to address
implicit val sender = mock[SendAndExpectFeedback[EventPart, Boolean, TestingTransport]]
- sender expects 'send withArguments (dest.connection, Write(event), system, timeout) once
+ sender expects 'send withArguments (dest.connection, Write(event), timeout) once
dest ask event
}
test("entry point to create a connection (sending multiple events expecting feedback)") {
val dest = riemannConnectAs[TestingTransport] to address
implicit val sender = mock[SendAndExpectFeedback[EventSeq, Boolean, TestingTransport]]
- sender expects 'send withArguments (dest.connection, Write(EventSeq(event, event2)), system, timeout) once
+ sender expects 'send withArguments (dest.connection, Write(EventSeq(event, event2)), timeout) once
dest ask EventSeq(event, event2)
}
test("entry point to create a connection (sending an query)") {
val dest = riemannConnectAs[TestingTransport] to address
implicit val sender = mock[SendAndExpectFeedback[Query, Iterable[EventPart], TestingTransport]]
- sender expects 'send withArguments (dest.connection, Write(Query("true")), system, timeout) once
+ sender expects 'send withArguments (dest.connection, Write(Query("true")), timeout) once
dest ask Query("true")
}
@@ -22,10 +22,10 @@ class UnreliableIOTest extends FunSuite
import testingsupport.TestingTransportSupport._
test("send a protobuf Msg") {
- val socket = mock[UnconnectedSocketWrapper]
+ val socket = mock[Unreliable.SocketWrapper]
socket expects 'send withArguments (WrappedArray.make(protoMsgEvent.toByteArray)) once
- val socketFactory = mockFunction[SocketAddress, UnconnectedSocketWrapper]
+ val socketFactory = mockFunction[SocketAddress, Unreliable.SocketWrapper]
socketFactory expects address returning socket once
val conn = implicitly[ConnectionBuilder[Unreliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
@@ -14,15 +14,16 @@ trait TestingTransportSupport {
implicit val timeout = Timeout(10 seconds)
val address = new InetSocketAddress(0)
- class TestingTransportConnection(val where: SocketAddress = new InetSocketAddress(0)) extends Connection[TestingTransport]
+ class TestingTransportConnection(val where: SocketAddress = new InetSocketAddress(0))
implicit object TestingTransportConnectionBuilder extends ConnectionBuilder[TestingTransport] {
- implicit def buildConnection(where: SocketAddress, factory: Option[TestingTransport#SocketFactory] = None, dispatcherId: Option[String])(implicit system: ActorSystem, timeout: Timeout): Connection[TestingTransport] =
+ implicit def buildConnection(where: SocketAddress, factory: Option[TestingTransport#SocketFactory] = None, dispatcherId: Option[String])(implicit system: ActorSystem, timeout: Timeout): TestingTransport#Connection =
new TestingTransportConnection(where)
}
trait TestingTransport extends TransportType {
type SocketWrapper = Unit
+ type Connection = TestingTransportConnection
}
val event = EventPart(host = Some("h"), service = Some("s"), state = Some("ok"))

0 comments on commit 3f13152

Please sign in to comment.