Permalink
Browse files

small transport types cleanups

  • Loading branch information...
1 parent 6b2919d commit 5e3e6086508f23b6e83c2a5662b62a8c59f5cf2c @benmur committed Jan 5, 2013
View
9 src/main/scala/net/benmur/riemann/client/DomainObjects.scala
@@ -33,13 +33,16 @@ case class WriteBinary(data: Array[Byte])
case class RemoteError(message: String) extends Throwable
trait TransportType {
- type SocketFactory
+ type SocketWrapper
+ type SocketFactory = SocketAddress => SocketWrapper
}
+
trait Reliable extends TransportType {
- type SocketFactory = SocketAddress => ConnectedSocketWrapper
+ type SocketWrapper = ConnectedSocketWrapper
}
+
trait Unreliable extends TransportType {
- type SocketFactory = SocketAddress => UnconnectedSocketWrapper
+ type SocketWrapper = UnconnectedSocketWrapper
}
trait Connection[T <: TransportType]
View
6 src/main/scala/net/benmur/riemann/client/ReliableIO.scala
@@ -2,8 +2,6 @@ package net.benmur.riemann.client
import java.io.{ DataInputStream, DataOutputStream }
import java.net.{ Socket, SocketAddress, SocketException }
-import java.util.concurrent.atomic.AtomicLong
-import scala.annotation.implicitNotFound
import com.aphyr.riemann.Proto
import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, OneForOneStrategy, Props }
import akka.actor.SupervisorStrategy._
@@ -15,8 +13,6 @@ import akka.util.duration.intToDurationInt
import akka.actor.ActorInitializationException
trait ReliableIO {
- private val nClients = new AtomicLong(0L) // FIXME this should be more global
-
private[this] class ReliableConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory, dispatcherId: Option[String])(implicit system: ActorSystem) extends Actor {
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second) { // This needs to be more reasonable
case _ => Restart
@@ -109,7 +105,7 @@ trait ReliableIO {
val p = Props(new ReliableConnectionActor(where, factory getOrElse makeTcpConnection, dispatcherId))
if (dispatcherId.isEmpty) p else p.withDispatcher(dispatcherId.get)
}
- new ReliableConnection(system.actorOf(props, "riemann-tcp-client-" + nClients.incrementAndGet))
+ new ReliableConnection(system.actorOf(props, io.IO.clientName("riemann-tcp-client-")))
}
}
}
View
7 src/main/scala/net/benmur/riemann/client/UnreliableIO.scala
@@ -1,7 +1,6 @@
package net.benmur.riemann.client
import java.net.{ DatagramPacket, DatagramSocket, SocketAddress }
-import java.util.concurrent.atomic.AtomicLong
import scala.annotation.implicitNotFound
import scala.collection.mutable.WrappedArray
@@ -10,14 +9,12 @@ import akka.actor.{ Actor, ActorSystem, Props }
import akka.util.Timeout
trait UnreliableIO {
- private val nClients = new AtomicLong(0L) // FIXME this should be more global
-
class UnreliableConnection(where: SocketAddress, factory: Unreliable#SocketFactory, dispatcherId: Option[String] = None)(implicit system: ActorSystem) extends Connection[Unreliable] {
val props = {
val p = Props(new UnconnectedConnectionActor(where, factory))
if (dispatcherId.isEmpty) p else p.withDispatcher(dispatcherId.get)
}
- val ioActor = system.actorOf(props, "riemann-udp-client-" + nClients.incrementAndGet)
+ val ioActor = system.actorOf(props, io.IO.clientName("riemann-udp-client-"))
}
implicit object UnreliableSendOff extends SendOff[EventPart, Unreliable] with Serializers {
@@ -36,7 +33,7 @@ trait UnreliableIO {
}
}
- val makeUdpConnection: Unreliable#SocketFactory = (addr) => {
+ val makeUdpConnection: Unreliable#SocketFactory = (addr: SocketAddress) => {
val dest = new DatagramSocket(addr)
new UnconnectedSocketWrapper {
override def send(data: WrappedArray[Byte]) = dest send new DatagramPacket(data.array, data.length)
View
8 src/main/scala/net/benmur/riemann/client/io/IO.scala
@@ -0,0 +1,8 @@
+package net.benmur.riemann.client.io
+
+import java.util.concurrent.atomic.AtomicLong
+
+object IO {
+ private val nClients = new AtomicLong(0L)
+ def clientName(actorNamePrefix: String) = actorNamePrefix + nClients.incrementAndGet
+}
View
2 src/test/scala/net/benmur/riemann/client/testingsupport/TestingTransportSupport.scala
@@ -22,7 +22,7 @@ trait TestingTransportSupport {
}
trait TestingTransport extends TransportType {
- type SocketFactory = SocketAddress => Unit
+ type SocketWrapper = Unit
}
val event = EventPart(host = Some("h"), service = Some("s"), state = Some("ok"))

0 comments on commit 5e3e608

Please sign in to comment.