Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base: 457c6e9341
...
compare: 0e5f0872ac
Checking mergeability… Don't worry, you can still create the pull request.
  • 5 commits
  • 16 files changed
  • 0 commit comments
  • 1 contributor
View
17 README.md
@@ -33,7 +33,7 @@ val tcpDestination = riemannConnectAs[Reliable] to new InetSocketAddress("localh
val udpDestination = riemannConnectAs[Unreliable] to new InetSocketAddress("localhost", 5555)
```
-Please note that operations returning a Future won't compile if the connection is created with an `Unreliable` type parameter, this is intentional. (Well, it will compile if you have an implicit in scope implementing `SendAndExpectFeedback[Unreliable]`).
+Please note that operations returning a Future won't compile if the connection is created with an `Unreliable` type parameter, this is intentional. (Well, it will compile if you have an implicit in scope implementing `SendAndExpectFeedback[_, _, Unreliable]`).
### Building events
@@ -82,9 +82,10 @@ state("warning") | metric(0.5) |>> metricsDestination
val metricsDestination = riemannConnectAs[Reliable] to new
InetSocketAddress("localhost", 5555) withValues(host("host") | service("service response time"))
-state("warning") | metric(0.5) |>< metricsDestination onSuccess {
- case Left(RemoteError(message)) => println("error: " + message)
- case Right(_) => println("sent ok")
+state("warning") | metric(0.5) |>< metricsDestination onComplete {
+ case Left(exception) => // ...
+ case Right(false) => println("not sent ok!!")
+ case Right(true) => println("sent ok")
}
```
@@ -92,13 +93,13 @@ state("warning") | metric(0.5) |>< metricsDestination onSuccess {
```scala
val metricsDestination = riemannConnectAs[Reliable] to new InetSocketAddress("localhost", 5555)
-Query("tagged \"slow\"") |>< metricsDestination onSuccess {
- case Left(RemoteError(message)) => println("error: " + message)
+Query("tagged \"slow\"") |>< metricsDestination onComplete {
+ case Left(exception) => // ...
case Right(events) => events foreach println
}
```
-Please note that operations returning a Future won't compile if the connection is created with an `Unreliable` type parameter, this is intentional. (Well, it will compile if you have an implicit in scope implementing `SendAndExpectFeedback[Unreliable]`).
+Please note that operations returning a Future won't compile if the connection is created with an `Unreliable` type parameter, this is intentional. (Well, it will compile if you have an implicit in scope implementing `SendAndExpectFeedback[_, _, Unreliable]`).
## Dependencies
@@ -111,7 +112,7 @@ Pull requests are very welcome.
This version is intended to work with Scala 2.9 and Akka 2.0. Support will be added for Scala 2.10 and Akka 2.1 when Scala 2.10 final is released.
-Care has been taken to be as reliable as possible, because sending metrics should not impact your applications stability. In particular:
+Care has been taken to be as reliable as possible, because sending metrics should not impact your application's stability. In particular:
- Unit test coverage is fairly good. No metrics are available yet, but the only code not tested is the actual socket code (which amounts to a total of 5 lines), for which the different conditions are mocked.
- All API-visible data structures are immutable and concurrency-friendly
- Network writes are serialized through Akka actors
View
6 build.sbt
@@ -1,6 +1,8 @@
name := "riemann-scala-client"
-version := "0.1"
+organization := "net.benmur"
+
+version := "0.2-SNAPSHOT"
scalaVersion := "2.9.2"
@@ -16,6 +18,6 @@ libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0.4"
libraryDependencies += "org.scalatest" %% "scalatest" % "1.8" % "test"
-libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "latest.integration"
+libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "latest.integration" % "test"
libraryDependencies += "com.typesafe.akka" % "akka-testkit" % "2.0.4" % "test"
View
38 src/main/scala/net/benmur/riemann/client/Destination.scala
@@ -2,9 +2,6 @@ package net.benmur.riemann.client
import java.net.SocketAddress
-import scala.annotation.implicitNotFound
-import scala.collection.JavaConversions.iterableAsScalaIterable
-
import akka.actor.ActorSystem
import akka.dispatch.Future
import akka.util.Timeout
@@ -17,26 +14,21 @@ trait DestinationOps {
class RiemannDestination[T <: TransportType](baseEvent: EventPart, val connection: Connection[T])(implicit system: ActorSystem, timeout: Timeout)
extends Destination[T] {
-
- def send(event: EventPart)(implicit messenger: SendOff[T]): Unit =
- messenger.sendOff(connection, Write(
- Serializers.serializeEventPartToProtoMsg(EventDSL.mergeEvents(baseEvent, event))))
-
- def ask(event: EventPart)(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]] =
- messenger.send(connection, Write(
- Serializers.serializeEventPartToProtoMsg(EventDSL.mergeEvents(baseEvent, event))))
-
- def send(events: Iterable[EventPart])(implicit messenger: SendOff[T]): Unit =
- messenger.sendOff(connection, Write(
- Serializers.serializeEventPartsToProtoMsg(events map (EventDSL.mergeEvents(baseEvent, _)))))
-
- def ask(events: Iterable[EventPart])(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]] =
- messenger.send(connection, Write(
- Serializers.serializeEventPartsToProtoMsg(events map (EventDSL.mergeEvents(baseEvent, _)))))
-
- def ask(query: Query)(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]] =
- messenger.send(connection, Write(
- Serializers.serializeQueryToProtoMsg(query)))
+
+ def send(event: EventPart)(implicit messenger: SendOff[EventPart, T]): Unit =
+ messenger.sendOff(connection, Write(EventDSL.mergeEvents(baseEvent, event)))
+
+ def ask(event: EventPart)(implicit messenger: SendAndExpectFeedback[EventPart, Boolean, T]): Future[Boolean] =
+ messenger.send(connection, Write(EventDSL.mergeEvents(baseEvent, event)))
+
+ def send(events: EventSeq)(implicit messenger: SendOff[EventSeq, T]): Unit =
+ messenger.sendOff(connection, Write(EventSeq(events.events map (EventDSL.mergeEvents(baseEvent, _)): _*)))
+
+ def ask(events: EventSeq)(implicit messenger: SendAndExpectFeedback[EventSeq, Boolean, T]): Future[Boolean] =
+ messenger.send(connection, Write(EventSeq(events.events map (EventDSL.mergeEvents(baseEvent, _)): _*)))
+
+ def ask(query: Query)(implicit messenger: SendAndExpectFeedback[Query, Iterable[EventPart], T]): Future[Iterable[EventPart]] =
+ messenger.send(connection, Write(query))
def withValues(event: EventPart): RiemannDestination[T] =
new RiemannDestination[T](EventDSL.mergeEvents(baseEvent, event), connection)
View
39 src/main/scala/net/benmur/riemann/client/DomainObjects.scala
@@ -2,12 +2,15 @@ package net.benmur.riemann.client
import java.io.{ InputStream, OutputStream }
import java.net.SocketAddress
+
import scala.annotation.implicitNotFound
-import com.aphyr.riemann.Proto
+import scala.collection.mutable.WrappedArray
+
import akka.actor.ActorSystem
import akka.dispatch.Future
import akka.util.Timeout
-import scala.collection.mutable.WrappedArray
+
+sealed trait RiemannSendable
case class EventPart(
host: Option[String] = None,
@@ -17,11 +20,15 @@ case class EventPart(
description: Option[String] = None,
tags: Iterable[String] = Nil,
metric: Option[AnyVal] = None,
- ttl: Option[Float] = None)
+ ttl: Option[Float] = None) extends RiemannSendable
+
+case class EventSeq(events: EventPart*) extends RiemannSendable
+
+case class Query(q: String) extends RiemannSendable
-case class Query(q: String)
+case class Write[T <: RiemannSendable](m: T)
-case class Write(m: Proto.Msg)
+case class WriteBinary(data: Array[Byte])
case class RemoteError(message: String) extends Throwable
@@ -47,11 +54,11 @@ trait UnconnectedSocketWrapper {
}
trait Destination[T <: TransportType] {
- def send(event: EventPart)(implicit messenger: SendOff[T]): Unit
- def ask(event: EventPart)(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]]
- def send(events: Iterable[EventPart])(implicit messenger: SendOff[T]): Unit
- def ask(events: Iterable[EventPart])(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]]
- def ask(query: Query)(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]]
+ def send(event: EventPart)(implicit messenger: SendOff[EventPart, T]): Unit
+ def ask(event: EventPart)(implicit messenger: SendAndExpectFeedback[EventPart, Boolean, T]): Future[Boolean]
+ def send(events: EventSeq)(implicit messenger: SendOff[EventSeq, T]): Unit
+ def ask(events: EventSeq)(implicit messenger: SendAndExpectFeedback[EventSeq, Boolean, T]): Future[Boolean]
+ def ask(query: Query)(implicit messenger: SendAndExpectFeedback[Query, Iterable[EventPart], T]): Future[Iterable[EventPart]]
def withValues(event: EventPart): Destination[T]
}
@@ -60,12 +67,12 @@ trait ConnectionBuilder[T <: TransportType] {
def buildConnection(where: SocketAddress, factory: Option[T#SocketFactory] = None, dispatcherId: Option[String] = None)(implicit system: ActorSystem, timeout: Timeout): Connection[T]
}
-@implicitNotFound(msg = "Connection type ${T} does not allow sending to Riemann because there is no implicit in scope returning a implementation of SendOff[${T}].")
-trait SendOff[T <: TransportType] {
- def sendOff(connection: Connection[T], command: Write): Unit
+@implicitNotFound(msg = "Connection type ${T} does not allow sending to ${S} Riemann because there is no implicit in scope returning a implementation of SendOff[${S}, ${T}].")
+trait SendOff[S <: RiemannSendable, T <: TransportType] {
+ def sendOff(connection: Connection[T], command: Write[S]): Unit
}
-@implicitNotFound(msg = "Connection type ${T} does not allow getting feedback from Riemann.")
-trait SendAndExpectFeedback[T <: TransportType] {
- def send(connection: Connection[T], command: Write)(implicit system: ActorSystem, timeout: Timeout): Future[Either[RemoteError, List[EventPart]]]
+@implicitNotFound(msg = "Connection type ${T} does not allow getting feedback from Riemann after sending ${S} because there is no implicit in scope returning a implementation of SendAndExpectFeedback[${S}, ${T}].")
+trait SendAndExpectFeedback[S <: RiemannSendable, R, T <: TransportType] {
+ def send(connection: Connection[T], command: Write[S])(implicit system: ActorSystem, timeout: Timeout): Future[R]
}
View
12 src/main/scala/net/benmur/riemann/client/EventSenderDSL.scala
@@ -1,21 +1,21 @@
package net.benmur.riemann.client
trait EventSenderDSL {
- class EventSenderOff[T <: TransportType](e: EventPart)(implicit messenger: SendOff[T]) {
+ class EventSenderOff[T <: TransportType](e: EventPart)(implicit messenger: SendOff[EventPart, T]) {
def |>>(d: Destination[T]) = d send e
}
- class EventSender[T <: TransportType](e: EventPart)(implicit messenger: SendAndExpectFeedback[T]) {
+ class EventSender[T <: TransportType](e: EventPart)(implicit messenger: SendAndExpectFeedback[EventPart, Boolean, T]) {
def |><(d: Destination[T]) = d ask e
}
- class QuerySender[T <: TransportType](q: Query)(implicit messenger: SendAndExpectFeedback[T]) {
+ class QuerySender[T <: TransportType](q: Query)(implicit messenger: SendAndExpectFeedback[Query, Iterable[EventPart], T]) {
def |><(d: Destination[T]) = d ask q
}
- implicit def event2EventSenderOff[T <: TransportType](e: EventPart)(implicit messenger: SendOff[T]) = new EventSenderOff[T](e)
- implicit def event2EventSender[T <: TransportType](e: EventPart)(implicit messenger: SendAndExpectFeedback[T]) = new EventSender[T](e)
- implicit def query2QuerySender[T <: TransportType](q: Query)(implicit messenger: SendAndExpectFeedback[T]) = new QuerySender[T](q)
+ implicit def event2EventSenderOff[T <: TransportType](e: EventPart)(implicit messenger: SendOff[EventPart, T]) = new EventSenderOff[T](e)
+ implicit def event2EventSender[T <: TransportType](e: EventPart)(implicit messenger: SendAndExpectFeedback[EventPart, Boolean, T]) = new EventSender[T](e)
+ implicit def query2QuerySender[T <: TransportType](q: Query)(implicit messenger: SendAndExpectFeedback[Query, Iterable[EventPart], T]) = new QuerySender[T](q)
}
object EventSenderDSL extends EventSenderDSL
View
55 src/main/scala/net/benmur/riemann/client/ReliableIO.scala
@@ -1,19 +1,20 @@
package net.benmur.riemann.client
import java.io.{ DataInputStream, DataOutputStream }
-import java.net.{ Socket, SocketAddress }
+import java.net.{ Socket, SocketAddress, SocketException }
import java.util.concurrent.atomic.AtomicLong
+
+import scala.annotation.implicitNotFound
+
import com.aphyr.riemann.Proto
-import akka.actor.{ Actor, ActorLogging, ActorSystem, Props, actorRef2Scala }
-import akka.dispatch.Future
+
+import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, OneForOneStrategy, Props }
+import akka.actor.SupervisorStrategy.Restart
+import akka.actor.actorRef2Scala
+import akka.dispatch.{ Future, Promise }
import akka.pattern.ask
import akka.util.Timeout
-import akka.dispatch.Promise
-import akka.actor.OneForOneStrategy
-import akka.actor.SupervisorStrategy._
-import akka.util.duration._
-import akka.actor.ActorRef
-import java.net.SocketException
+import akka.util.duration.intToDurationInt
trait ReliableIO {
private val nClients = new AtomicLong(0L) // FIXME this should be more global
@@ -35,21 +36,34 @@ trait ReliableIO {
}
}
- implicit object ReliableSendAndExpectFeedback extends SendAndExpectFeedback[Reliable] {
- def send(connection: Connection[Reliable], command: Write)(implicit system: ActorSystem, timeout: Timeout): Future[Either[RemoteError, List[EventPart]]] =
+ implicit object ReliableEventPartSendAndExpectFeedback extends SendAndExpectFeedback[EventPart, Boolean, Reliable] with Serializers {
+ def send(connection: Connection[Reliable], command: Write[EventPart])(implicit system: ActorSystem, timeout: Timeout): Future[Boolean] =
connection match {
case rc: ReliableConnection =>
- (rc.ioActor ask command).mapTo[Either[RemoteError, List[EventPart]]]
+ val data = serializeEventPartToProtoMsg(command.m).toByteArray
+ (rc.ioActor ask WriteBinary(data)).mapTo[Proto.Msg] map (_.getOk)
+
+ case c =>
+ Promise.failed(RemoteError("don't know how to send data to " + c.getClass.getName))
+ }
+ }
+
+ implicit object ReliableQuerySendAndExpectFeedback extends SendAndExpectFeedback[Query, Iterable[EventPart], Reliable] with Serializers {
+ def send(connection: Connection[Reliable], command: Write[Query])(implicit system: ActorSystem, timeout: Timeout): Future[Iterable[EventPart]] =
+ connection match {
+ case rc: ReliableConnection =>
+ val data = serializeQueryToProtoMsg(command.m).toByteArray
+ (rc.ioActor ask WriteBinary(data)).mapTo[Proto.Msg] map (unserializeProtoMsg(_))
+
case c =>
- Promise.successful(Left(RemoteError(
- "don't know how to send data to " + c.getClass.getName)))
+ Promise.failed(RemoteError("don't know how to send data to " + c.getClass.getName))
}
}
- implicit object ReliableSendOff extends SendOff[Reliable] {
- def sendOff(connection: Connection[Reliable], command: Write): Unit = connection match {
+ implicit object ReliableSendOff extends SendOff[EventPart, Reliable] with Serializers {
+ def sendOff(connection: Connection[Reliable], command: Write[EventPart]): Unit = connection match {
case rc: ReliableConnection =>
- rc.ioActor tell command
+ rc.ioActor tell WriteBinary(serializeEventPartToProtoMsg(command.m).toByteArray)
case c =>
System.err.println(
"don't know how to send data to " + c.getClass.getName)
@@ -62,20 +76,19 @@ trait ReliableIO {
val inputStream = new DataInputStream(connection.inputStream)
println("actor init")
def receive = {
- case Write(msg) =>
+ case WriteBinary(ab) =>
try {
- val ab = msg.toByteArray
outputStream writeInt ab.length
outputStream write ab
outputStream.flush
val buf = Array.ofDim[Byte](inputStream.readInt())
inputStream.readFully(buf)
- sender ! Serializers.unserializeProtoMsg(Proto.Msg.parseFrom(buf))
+ sender ! Proto.Msg.parseFrom(buf)
} catch {
case e: SocketException => throw e
case exception =>
log.error(exception, "could not send or receive data")
- sender ! Left(RemoteError(exception.getMessage()))
+ sender ! Proto.Msg.newBuilder.setError(exception.getMessage).setOk(false).build
}
}
}
View
1  src/main/scala/net/benmur/riemann/client/RiemannClient.scala
@@ -6,7 +6,6 @@ import akka.util.Timeout
object RiemannClient
extends EventDSL
with EventSenderDSL
- with Serializers
with ReliableIO
with UnreliableIO
with DestinationOps {
View
17 src/main/scala/net/benmur/riemann/client/Serializers.scala
@@ -1,6 +1,5 @@
package net.benmur.riemann.client
-import scala.Option.option2Iterable
import scala.collection.JavaConversions.{ asJavaIterable, iterableAsScalaIterable }
import com.aphyr.riemann.Proto
@@ -10,16 +9,16 @@ trait Serializers {
.setQuery(Proto.Query.newBuilder().setString(q.q))
.build
- def serializeEventPartToProtoMsg(e: EventPart) = serializeEventPartsToProtoMsg(Some(e))
+ def serializeEventPartToProtoMsg(e: EventPart) = serializeEventPartsToProtoMsg(EventSeq(e))
- def serializeEventPartsToProtoMsg(ei: Iterable[EventPart]) = Proto.Msg.newBuilder
- .addAllEvents(ei map convertOneEventPart)
+ def serializeEventPartsToProtoMsg(ei: EventSeq) = Proto.Msg.newBuilder
+ .addAllEvents(ei.events map convertOneEventPart)
.build
- def unserializeProtoMsg(m: Proto.Msg): Either[RemoteError, List[EventPart]] = m.hasOk match {
- case true if m.getOk => Right(m.getEventsList map convertProtoEventToEventPart toList)
- case true => Left(RemoteError(m.getError))
- case false => Left(RemoteError("Response has no status"))
+ def unserializeProtoMsg(m: Proto.Msg): Iterable[EventPart] = m.hasOk match {
+ case true if m.getOk => m.getEventsList map convertProtoEventToEventPart toList
+ case true => throw RemoteError(m.getError)
+ case false => throw RemoteError("Response has no status")
}
private def convertOneEventPart(e: EventPart) = {
@@ -59,5 +58,3 @@ trait Serializers {
Some(e.getMetricSint64)
else None
}
-
-object Serializers extends Serializers
View
17 src/main/scala/net/benmur/riemann/client/UnreliableIO.scala
@@ -2,9 +2,12 @@ package net.benmur.riemann.client
import java.net.{ DatagramPacket, DatagramSocket, SocketAddress }
import java.util.concurrent.atomic.AtomicLong
+
+import scala.annotation.implicitNotFound
+import scala.collection.mutable.WrappedArray
+
import akka.actor.{ Actor, ActorSystem, Props }
import akka.util.Timeout
-import scala.collection.mutable.WrappedArray
trait UnreliableIO {
private val nClients = new AtomicLong(0L) // FIXME this should be more global
@@ -17,17 +20,19 @@ trait UnreliableIO {
val ioActor = system.actorOf(props, "riemann-udp-client-" + nClients.incrementAndGet)
}
- implicit object UnreliableSendOff extends SendOff[Unreliable] {
- def sendOff(connection: Connection[Unreliable], command: Write): Unit = connection match {
- case uc: UnreliableConnection => uc.ioActor tell command
- case c => System.err.println("don't know how to send data to " + c.getClass.getName)
+ implicit object UnreliableSendOff extends SendOff[EventPart, Unreliable] with Serializers {
+ def sendOff(connection: Connection[Unreliable], command: Write[EventPart]): Unit = connection match {
+ case uc: UnreliableConnection =>
+ val data = serializeEventPartToProtoMsg(command.m).toByteArray
+ uc.ioActor tell WriteBinary(data)
+ case c => System.err.println("don't know how to send data to " + c.getClass.getName)
}
}
private[this] class UnconnectedConnectionActor(where: SocketAddress, factory: Unreliable#SocketFactory) extends Actor {
val connection = factory(where)
def receive = {
- case Write(msg) => connection send msg.toByteArray
+ case WriteBinary(data) => connection send data
}
}
View
6 src/test/scala/net/benmur/riemann/client/EventSenderDSLTest.scala
@@ -24,7 +24,7 @@ class EventSenderDSLTest extends FunSuite with BeforeAndAfterAll {
test("DSL operator to send an event without expecting a result") {
val (conn, dest) = makeDestination
- expect(Write(protoMsgEvent)) {
+ expect(Write(event)) {
event |>> dest
conn.sentOff
}
@@ -33,7 +33,7 @@ class EventSenderDSLTest extends FunSuite with BeforeAndAfterAll {
test("DSL operator to send operator to send an event expecting a status") {
val (conn, dest) = makeDestination
- expect(Write(protoMsgEvent)) {
+ expect(Write(event)) {
event |>< dest
conn.sentExpect
}
@@ -42,7 +42,7 @@ class EventSenderDSLTest extends FunSuite with BeforeAndAfterAll {
test("DSL operator to send operator to send a query expecting a status") {
val (conn, dest) = makeDestination
- expect(Write(protoMsgQuery)) {
+ expect(Write(Query("true"))) {
Query("true") |>< dest
conn.sentExpect
}
View
67 src/test/scala/net/benmur/riemann/client/ReliableIOTest.scala
@@ -1,24 +1,19 @@
package net.benmur.riemann.client
-import org.scalatest.FunSuite
-import net.benmur.riemann.client.testingsupport.TestingTransportSupport
-import akka.actor.ActorSystem
-import java.net.InetSocketAddress
-import java.net.Socket
-import org.scalatest.BeforeAndAfterAll
-import org.scalamock.scalatest.MockFactory
+import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream }
+import java.net.{ InetSocketAddress, SocketAddress }
+
import org.scalamock.ProxyMockFactory
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.scalatest.matchers.ShouldMatchers
-import java.net.SocketAddress
-import akka.testkit.CallingThreadDispatcher
-import org.scalamock.annotation.mock
-import java.io.ByteArrayInputStream
-import java.io.ByteArrayOutputStream
-import java.io.DataInputStream
-import java.io.DataOutputStream
+
import com.aphyr.riemann.Proto
+
+import akka.actor.ActorSystem
import akka.dispatch.Await
-import akka.util.duration._
+import akka.testkit.CallingThreadDispatcher
+import akka.util.duration.intToDurationInt
class ReliableIOTest extends FunSuite
with BeforeAndAfterAll
@@ -27,7 +22,7 @@ class ReliableIOTest extends FunSuite
with ShouldMatchers {
import ReliableIO._
- import TestingTransportSupport._
+ import testingsupport.TestingTransportSupport._
implicit val system = ActorSystem()
val address = new InetSocketAddress(0)
@@ -36,7 +31,7 @@ class ReliableIOTest extends FunSuite
system.shutdown
}
- test("sending a protobuf Msg") {
+ test("sending a protobuf Msg with Event") {
val in = Array.ofDim[Byte](256)
val ios = new ByteArrayInputStream(in)
val oos = new ByteArrayOutputStream()
@@ -49,7 +44,7 @@ class ReliableIOTest extends FunSuite
socketFactory expects address returning wrapper once
val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
- implicitly[SendOff[Reliable]].sendOff(conn, Write(protoMsgEvent))
+ implicitly[SendOff[EventPart, Reliable]].sendOff(conn, Write(event))
val out = oos.toByteArray
val outRef = protoMsgEvent.toByteArray
@@ -57,7 +52,7 @@ class ReliableIOTest extends FunSuite
out.slice(4, out.length) should be === outRef
}
- test("sending a protobuf Msg, with feedback") {
+ test("sending a protobuf Msg with Event, with feedback") {
val response = Proto.Msg.newBuilder.setOk(true).build
val responseBytes = response.toByteArray
@@ -76,7 +71,7 @@ class ReliableIOTest extends FunSuite
socketFactory expects address returning wrapper once
val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
- val respFuture = implicitly[SendAndExpectFeedback[Reliable]].send(conn, Write(protoMsgEvent))
+ val respFuture = implicitly[SendAndExpectFeedback[EventPart, Boolean, Reliable]].send(conn, Write(event))
val out = oos.toByteArray
val outRef = protoMsgEvent.toByteArray
@@ -84,6 +79,36 @@ class ReliableIOTest extends FunSuite
out.slice(4, out.length) should be === outRef
val resp = Await.result(respFuture, 1 second)
- resp should be === Right(Nil)
+ resp should be === true
+ }
+
+ test("sending a protobuf Msg with Query, with feedback") {
+ val response = Proto.Msg.newBuilder.addEvents(protoEvent).addEvents(protoEvent2).setOk(true).build
+ val responseBytes = response.toByteArray
+
+ val responseBuilder = new ByteArrayOutputStream()
+ val responseBuilderData = new DataOutputStream(responseBuilder)
+ responseBuilderData.writeInt(responseBytes.length)
+ responseBuilderData.write(responseBytes)
+
+ val oos = new ByteArrayOutputStream()
+
+ val wrapper = mock[ConnectedSocketWrapper]
+ wrapper expects 'outputStream returning oos once;
+ wrapper expects 'inputStream returning new ByteArrayInputStream(responseBuilder.toByteArray) once
+
+ val socketFactory = mockFunction[SocketAddress, ConnectedSocketWrapper]
+ socketFactory expects address returning wrapper once
+
+ val conn = implicitly[ConnectionBuilder[Reliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
+ val respFuture = implicitly[SendAndExpectFeedback[Query, Iterable[EventPart], Reliable]].send(conn, Write(Query("true")))
+
+ val out = oos.toByteArray
+ val queryData = protoMsgQuery.toByteArray
+ new DataInputStream(new ByteArrayInputStream(out)).readInt should be === queryData.length
+ out.slice(4, out.length) should be === queryData
+
+ val resp = Await.result(respFuture, 1 second)
+ resp should be === Seq(event, event2)
}
}
View
28 src/test/scala/net/benmur/riemann/client/RiemannClientWithDestinationAPITest.scala
@@ -1,15 +1,16 @@
package net.benmur.riemann.client
-import org.scalatest.FunSuite
-import net.benmur.riemann.client.testingsupport.TestingTransportSupport
-import akka.actor.ActorSystem
-import org.scalatest.BeforeAndAfterAll
import java.net.InetSocketAddress
+
+import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.scalatest.matchers.ShouldMatchers
+import RiemannClient.{ host, riemannConnectAs, service, state }
+import akka.actor.ActorSystem
+
class RiemannClientWithDestinationAPITest extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
import RiemannClient._
- import TestingTransportSupport._
+ import testingsupport.TestingTransportSupport._
implicit val system = ActorSystem()
val address = new InetSocketAddress(0)
@@ -24,6 +25,7 @@ class RiemannClientWithDestinationAPITest extends FunSuite with BeforeAndAfterAl
conn.where should be theSameInstanceAs address
conn.sentOff should be === null
+ conn.sentExpect should be === null
}
test("entry point to create a connection (sending an event)") {
@@ -31,15 +33,15 @@ class RiemannClientWithDestinationAPITest extends FunSuite with BeforeAndAfterAl
val conn = dest.connection.asInstanceOf[TestingTransportConnection]
dest send event
- conn.sentOff should be === Write(protoMsgEvent)
+ conn.sentOff should be === Write(event)
}
test("entry point to create a connection (sending multiple events)") {
val dest = riemannConnectAs[TestingTransport] to address
val conn = dest.connection.asInstanceOf[TestingTransportConnection]
- dest send List(event, event2)
- conn.sentOff should be === Write(protoMsgEvents)
+ dest send EventSeq(event, event2)
+ conn.sentOff should be === Write(EventSeq(event, event2))
}
test("entry point to create a connection (sending an event expecting feedback)") {
@@ -47,15 +49,15 @@ class RiemannClientWithDestinationAPITest extends FunSuite with BeforeAndAfterAl
val conn = dest.connection.asInstanceOf[TestingTransportConnection]
dest ask event
- conn.sentExpect should be === Write(protoMsgEvent)
+ conn.sentExpect should be === Write(event)
}
test("entry point to create a connection (sending multiple events expecting feedback)") {
val dest = riemannConnectAs[TestingTransport] to address
val conn = dest.connection.asInstanceOf[TestingTransportConnection]
- dest ask List(event, event2)
- conn.sentExpect should be === Write(protoMsgEvents)
+ dest ask EventSeq(event, event2)
+ conn.sentExpect should be === Write(EventSeq(event, event2))
}
test("entry point to create a connection (sending an query)") {
@@ -63,7 +65,7 @@ class RiemannClientWithDestinationAPITest extends FunSuite with BeforeAndAfterAl
val conn = dest.connection.asInstanceOf[TestingTransportConnection]
dest ask Query("true")
- conn.sentExpect should be === Write(protoMsgQuery)
+ conn.sentExpect should be === Write(Query("true"))
}
test("entry point to create a connection (sending an event, combining default EventPart values)") {
@@ -71,6 +73,6 @@ class RiemannClientWithDestinationAPITest extends FunSuite with BeforeAndAfterAl
val conn = dest.connection.asInstanceOf[TestingTransportConnection]
dest send state("ok")
- conn.sentOff should be === Write(protoMsgEvent)
+ conn.sentOff should be === Write(host("h") | service("s") | state("ok"))
}
}
View
71 src/test/scala/net/benmur/riemann/client/SerializersTest.scala
@@ -1,12 +1,14 @@
package net.benmur.riemann.client
+
+import scala.collection.JavaConversions.asJavaIterable
import org.scalatest.FunSuite
import com.aphyr.riemann.Proto
-import net.benmur.riemann.client.testingsupport.SerializersFixture
-import scala.collection.JavaConversions.asJavaIterable
+import org.scalatest.matchers.ShouldMatchers
-class SerializersTest extends FunSuite {
+class SerializersTest extends FunSuite with ShouldMatchers {
+ object Serializers extends Serializers
import Serializers._
- import SerializersFixture._
+ import testingsupport.SerializersFixture._
test("out: convert a full EventPart to a protobuf Msg") {
val expected = Proto.Msg.newBuilder.addEvents(protobufEvent1).build
@@ -119,7 +121,7 @@ class SerializersTest extends FunSuite {
val expected = Proto.Msg.newBuilder.addEvents(protobufEvent1).addEvents(protobufEvent2).build
expect(expected) {
- serializeEventPartsToProtoMsg(List(event1, event2))
+ serializeEventPartsToProtoMsg(EventSeq(event1, event2))
}
}
@@ -130,107 +132,110 @@ class SerializersTest extends FunSuite {
}
test("in: convert a protobuf Msg response with an ok status") {
- expect(Right(Nil)) {
+ expect(Nil) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).build)
}
}
test("in: convert a protobuf Msg response with a non-ok status and an error message") {
- expect(Left(RemoteError("meh"))) {
+ val ex = intercept[RemoteError] {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(false).setError("meh").build)
}
+ ex.message should be === "meh"
}
test("in: convert a failed Query result from a protobuf Msg with events") {
- expect(Left(RemoteError("meh"))) {
+ val ex = intercept[RemoteError] {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(false).setError("meh").addEvents(protobufEvent1).build)
}
+ ex.message should be === "meh"
}
test("in: convert a successful Query result from a protobuf Msg to multiple EventParts") {
- expect(Right(List(event1))) {
+ expect(List(event1)) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(protobufEvent1).build)
}
}
- test("in: convert Query result with missing ok from a protobuf Msg to RemoteError") {
- expect(Left(RemoteError("Response has no status"))) {
+ test("in: convert a Query result with missing ok from a protobuf Msg to RemoteError") {
+ val ex = intercept[RemoteError] {
unserializeProtoMsg(Proto.Msg.newBuilder.addEvents(protobufEvent1).build)
}
+ ex.message should be === "Response has no status"
}
- test("in: convert a protobuf Msg with empty Event to a Right(List(EventPart))") {
- expect(Right(List(EventPart()))) {
+ test("in: convert a protobuf Msg with empty Event to a List(EventPart)") {
+ expect(List(EventPart())) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder).build)
}
}
- test("in: convert a protobuf Msg with Event with only host to a Right(List(EventPart))") {
- expect(Right(List(EventPart(host = Some("host"))))) {
+ test("in: convert a protobuf Msg with Event with only host to a List(EventPart)") {
+ expect(List(EventPart(host = Some("host")))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setHost("host")).build)
}
}
- test("in: convert a protobuf Msg with Event with only service to a Right(List(EventPart))") {
- expect(Right(List(EventPart(service = Some("service"))))) {
+ test("in: convert a protobuf Msg with Event with only service to a List(EventPart)") {
+ expect(List(EventPart(service = Some("service")))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setService("service")).build)
}
}
- test("in: convert a protobuf Msg with Event with only state to a Right(List(EventPart))") {
- expect(Right(List(EventPart(state = Some("state"))))) {
+ test("in: convert a protobuf Msg with Event with only state to a List(EventPart)") {
+ expect(List(EventPart(state = Some("state")))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setState("state")).build)
}
}
- test("in: convert a protobuf Msg with Event with only time to a Right(List(EventPart))") {
- expect(Right(List(EventPart(time = Some(1234L))))) {
+ test("in: convert a protobuf Msg with Event with only time to a List(EventPart)") {
+ expect(List(EventPart(time = Some(1234L)))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setTime(1234L)).build)
}
}
- test("in: convert a protobuf Msg with Event with only description to a Right(List(EventPart))") {
- expect(Right(List(EventPart(description = Some("description"))))) {
+ test("in: convert a protobuf Msg with Event with only description to a List(EventPart)") {
+ expect(List(EventPart(description = Some("description")))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setDescription("description")).build)
}
}
- test("in: convert a protobuf Msg with Event with only tags to a Right(List(EventPart))") {
- expect(Right(List(EventPart(tags = List("tag1", "tag2"))))) {
+ test("in: convert a protobuf Msg with Event with only tags to a List(EventPart)") {
+ expect(List(EventPart(tags = List("tag1", "tag2")))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.addAllTags(List("tag1", "tag2"))).build)
}
}
- test("in: convert a protobuf Msg with Event with only metric (long) to a Right(List(EventPart))") {
- expect(Right(List(EventPart(metric = Some(1234L))))) {
+ test("in: convert a protobuf Msg with Event with only metric (long) to a List(EventPart)") {
+ expect(List(EventPart(metric = Some(1234L)))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setMetricSint64(1234L)).build)
}
}
- test("in: convert a protobuf Msg with Event with only metric (float) to a Right(List(EventPart))") {
- expect(Right(List(EventPart(metric = Some(1234.0f))))) {
+ test("in: convert a protobuf Msg with Event with only metric (float) to a List(EventPart)") {
+ expect(List(EventPart(metric = Some(1234.0f)))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setMetricF(1234.0f)).build)
}
}
- test("in: convert a protobuf Msg with Event with only metric (double) to a Right(List(EventPart))") {
- expect(Right(List(EventPart(metric = Some(1234.1: Double))))) {
+ test("in: convert a protobuf Msg with Event with only metric (double) to a List(EventPart)") {
+ expect(List(EventPart(metric = Some(1234.1: Double)))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setMetricD(1234.1: Double)).build)
}
}
- test("in: convert a protobuf Msg with Event with only ttl to a Right(List(EventPart))") {
- expect(Right(List(EventPart(ttl = Some(1234L))))) {
+ test("in: convert a protobuf Msg with Event with only ttl to a List(EventPart)") {
+ expect(List(EventPart(ttl = Some(1234L)))) {
unserializeProtoMsg(Proto.Msg.newBuilder.setOk(true).addEvents(
Proto.Event.newBuilder.setTtl(1234L)).build)
}
View
5 src/test/scala/net/benmur/riemann/client/UnreliableIOTest.scala
@@ -12,7 +12,6 @@ import org.scalatest.matchers.ShouldMatchers
import akka.actor.ActorSystem
import akka.testkit.CallingThreadDispatcher
-import net.benmur.riemann.client.testingsupport.TestingTransportSupport
class UnreliableIOTest extends FunSuite
with BeforeAndAfterAll
@@ -21,7 +20,7 @@ class UnreliableIOTest extends FunSuite
with ShouldMatchers {
import UnreliableIO._
- import TestingTransportSupport._
+ import testingsupport.TestingTransportSupport._
implicit val system = ActorSystem()
val address = new InetSocketAddress(0)
@@ -38,6 +37,6 @@ class UnreliableIOTest extends FunSuite
socketFactory expects address returning socket once
val conn = implicitly[ConnectionBuilder[Unreliable]].buildConnection(address, Some(socketFactory), Some(CallingThreadDispatcher.Id))
- implicitly[SendOff[Unreliable]].sendOff(conn, Write(protoMsgEvent))
+ implicitly[SendOff[EventPart, Unreliable]].sendOff(conn, Write(event))
}
}
View
1  src/test/scala/net/benmur/riemann/client/testingsupport/SerializersFixture.scala
@@ -1,4 +1,5 @@
package net.benmur.riemann.client.testingsupport
+
import scala.collection.JavaConversions.asJavaIterable
import com.aphyr.riemann.Proto
View
64 src/test/scala/net/benmur/riemann/client/testingsupport/TestingTransportSupport.scala
@@ -1,44 +1,66 @@
package net.benmur.riemann.client.testingsupport
-import akka.util.Timeout
-import akka.util.duration.intToDurationInt
-import net.benmur.riemann.client.{ Connection, EventSenderDSL, SendOff, TransportType, Write }
+import java.net.{ InetSocketAddress, SocketAddress }
import com.aphyr.riemann.Proto
-import net.benmur.riemann.client.DestinationOps
-import net.benmur.riemann.client.EventPart
import akka.actor.ActorSystem
-import akka.dispatch.Future
-import net.benmur.riemann.client.SendAndExpectFeedback
-import akka.dispatch.Promise
-import net.benmur.riemann.client.RemoteError
-import java.net.SocketAddress
-import net.benmur.riemann.client.ConnectionBuilder
-import java.net.InetSocketAddress
+import akka.dispatch.{ Future, Promise }
+import akka.util.Timeout
+import akka.util.duration.intToDurationInt
+import net.benmur.riemann.client._
trait TestingTransportSupport {
- import EventSenderDSL._
+ import RiemannClient._
implicit val timeout = Timeout(1 millisecond)
- implicit object TestingTransportSendOff extends SendOff[TestingTransport] {
- def sendOff(connection: Connection[TestingTransport], command: Write): Unit = connection match {
+ implicit object TestingTransportEventPartSendOff extends SendOff[EventPart, TestingTransport] {
+ def sendOff(connection: Connection[TestingTransport], command: Write[EventPart]): Unit = connection match {
+ case uc: TestingTransportConnection => uc.sentOff = command
+ }
+ }
+
+ implicit object TestingTransportEventSeqSendOff extends SendOff[EventSeq, TestingTransport] {
+ def sendOff(connection: Connection[TestingTransport], command: Write[EventSeq]): Unit = connection match {
case uc: TestingTransportConnection => uc.sentOff = command
}
}
- implicit object TestingTransportSendAndExpectFeedback extends SendAndExpectFeedback[TestingTransport] {
- def send(connection: Connection[TestingTransport], command: Write)(implicit system: ActorSystem, timeout: Timeout): Future[Either[RemoteError, List[EventPart]]] =
+ implicit object TestingTransportEventPartSendAndExpectFeedback extends SendAndExpectFeedback[EventPart, Boolean, TestingTransport] {
+ def send(connection: Connection[TestingTransport], command: Write[EventPart])(implicit system: ActorSystem, timeout: Timeout): Future[Boolean] =
+ connection match {
+ case tc: TestingTransportConnection =>
+ tc.sentExpect = command
+ Promise.successful(true)
+ case c =>
+ Promise.failed(RemoteError("bad connection type"))
+ }
+ }
+
+ implicit object TestingTransportEventSeqSendAndExpectFeedback extends SendAndExpectFeedback[EventSeq, Boolean, TestingTransport] {
+ def send(connection: Connection[TestingTransport], command: Write[EventSeq])(implicit system: ActorSystem, timeout: Timeout): Future[Boolean] =
+ connection match {
+ case tc: TestingTransportConnection =>
+ tc.sentExpect = command
+ Promise.successful(true)
+ case c =>
+ Promise.failed(RemoteError("bad connection type"))
+ }
+ }
+
+ implicit object TestingTransportQuerySendAndExpectFeedback extends SendAndExpectFeedback[Query, Iterable[EventPart], TestingTransport] {
+ def send(connection: Connection[TestingTransport], command: Write[Query])(implicit system: ActorSystem, timeout: Timeout): Future[Iterable[EventPart]] =
connection match {
case tc: TestingTransportConnection =>
tc.sentExpect = command
- Promise.successful(Right(Nil))
- case c => Promise.successful(Left(RemoteError("bad connection type")))
+ Promise.successful(Seq(event, event2))
+ case c =>
+ Promise.failed(RemoteError("bad connection type"))
}
}
class TestingTransportConnection(val where: SocketAddress = new InetSocketAddress(0)) extends Connection[TestingTransport] {
- var sentOff: Write = _
- var sentExpect: Write = _
+ var sentOff: Write[_ <: RiemannSendable] = _
+ var sentExpect: Write[_ <: RiemannSendable] = _
}
implicit object TestingTransportConnectionBuilder extends ConnectionBuilder[TestingTransport] {

No commit comments for this range

Something went wrong with that request. Please try again.