Permalink
Browse files

initial import

  • Loading branch information...
benmur committed Dec 31, 2012
1 parent 69a649e commit e71a4a1f32b05f5ff012e97f98ecae5b73855328
View
20 LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2012 Rached Ben Mustapha
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
View
@@ -0,0 +1,21 @@
+name := "riemann-scala-client"
+
+version := "0.1"
+
+scalaVersion := "2.9.2"
+
+scalacOptions += "-deprecation"
+
+resolvers += "Clojars" at "http://clojars.org/repo"
+
+resolvers += "Akka" at "http://repo.akka.io/releases"
+
+libraryDependencies += "com.aphyr" % "riemann-java-client" % "0.0.6"
+
+libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0.4"
+
+libraryDependencies += "org.scalatest" %% "scalatest" % "1.8" % "test"
+
+libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "latest.integration"
+
+libraryDependencies += "com.typesafe.akka" % "akka-testkit" % "2.0.4" % "test"
@@ -0,0 +1,44 @@
+package net.benmur.riemann.client
+
+import java.net.SocketAddress
+
+import scala.annotation.implicitNotFound
+import scala.collection.JavaConversions.iterableAsScalaIterable
+
+import akka.actor.ActorSystem
+import akka.dispatch.Future
+import akka.util.Timeout
+
+trait DestinationOps {
+ class DestinationBuilder[T <: TransportType](connectionBuilder: ConnectionBuilder[T])(implicit system: ActorSystem, timeout: Timeout) {
+ def to(where: SocketAddress): RiemannDestination[T] =
+ new RiemannDestination[T](EventPart(), connectionBuilder.buildConnection(where))
+ }
+
+ class RiemannDestination[T <: TransportType](baseEvent: EventPart, val connection: Connection[T])(implicit system: ActorSystem, timeout: Timeout)
+ extends Destination[T] {
+
+ def send(event: EventPart)(implicit messenger: SendOff[T]): Unit =
+ messenger.sendOff(connection, Write(
+ Serializers.serializeEventPartToProtoMsg(EventDSL.mergeEvents(baseEvent, event))))
+
+ def ask(event: EventPart)(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]] =
+ messenger.send(connection, Write(
+ Serializers.serializeEventPartToProtoMsg(EventDSL.mergeEvents(baseEvent, event))))
+
+ def send(events: Iterable[EventPart])(implicit messenger: SendOff[T]): Unit =
+ messenger.sendOff(connection, Write(
+ Serializers.serializeEventPartsToProtoMsg(events map (EventDSL.mergeEvents(baseEvent, _)))))
+
+ def ask(events: Iterable[EventPart])(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]] =
+ messenger.send(connection, Write(
+ Serializers.serializeEventPartsToProtoMsg(events map (EventDSL.mergeEvents(baseEvent, _)))))
+
+ def ask(query: Query)(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]] =
+ messenger.send(connection, Write(
+ Serializers.serializeQueryToProtoMsg(query)))
+
+ def withValues(event: EventPart): RiemannDestination[T] =
+ new RiemannDestination[T](EventDSL.mergeEvents(baseEvent, event), connection)
+ }
+}
@@ -0,0 +1,71 @@
+package net.benmur.riemann.client
+
+import java.io.{ InputStream, OutputStream }
+import java.net.SocketAddress
+import scala.annotation.implicitNotFound
+import com.aphyr.riemann.Proto
+import akka.actor.ActorSystem
+import akka.dispatch.Future
+import akka.util.Timeout
+import scala.collection.mutable.WrappedArray
+
+case class EventPart(
+ host: Option[String] = None,
+ service: Option[String] = None,
+ state: Option[String] = None,
+ time: Option[Long] = None,
+ description: Option[String] = None,
+ tags: Iterable[String] = Nil,
+ metric: Option[AnyVal] = None,
+ ttl: Option[Float] = None)
+
+case class Query(q: String)
+
+case class Write(m: Proto.Msg)
+
+case class RemoteError(message: String) extends Throwable
+
+trait TransportType {
+ type SocketFactory
+}
+trait Reliable extends TransportType {
+ type SocketFactory = SocketAddress => ConnectedSocketWrapper
+}
+trait Unreliable extends TransportType {
+ type SocketFactory = SocketAddress => UnconnectedSocketWrapper
+}
+
+trait Connection[T <: TransportType]
+
+trait ConnectedSocketWrapper {
+ def inputStream: InputStream
+ def outputStream: OutputStream
+}
+
+trait UnconnectedSocketWrapper {
+ def send(data: WrappedArray[Byte]): Unit
+}
+
+trait Destination[T <: TransportType] {
+ def send(event: EventPart)(implicit messenger: SendOff[T]): Unit
+ def ask(event: EventPart)(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]]
+ def send(events: Iterable[EventPart])(implicit messenger: SendOff[T]): Unit
+ def ask(events: Iterable[EventPart])(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]]
+ def ask(query: Query)(implicit messenger: SendAndExpectFeedback[T]): Future[Either[RemoteError, List[EventPart]]]
+ def withValues(event: EventPart): Destination[T]
+}
+
+@implicitNotFound(msg = "No way of building a connection to Riemann of type ${T}.")
+trait ConnectionBuilder[T <: TransportType] {
+ def buildConnection(where: SocketAddress, factory: Option[T#SocketFactory] = None, dispatcherId: Option[String] = None)(implicit system: ActorSystem, timeout: Timeout): Connection[T]
+}
+
+@implicitNotFound(msg = "Connection type ${T} does not allow sending to Riemann because there is no implicit in scope returning a implementation of SendOff[${T}].")
+trait SendOff[T <: TransportType] {
+ def sendOff(connection: Connection[T], command: Write): Unit
+}
+
+@implicitNotFound(msg = "Connection type ${T} does not allow getting feedback from Riemann.")
+trait SendAndExpectFeedback[T <: TransportType] {
+ def send(connection: Connection[T], command: Write)(implicit system: ActorSystem, timeout: Timeout): Future[Either[RemoteError, List[EventPart]]]
+}
@@ -0,0 +1,32 @@
+package net.benmur.riemann.client
+
+trait EventDSL {
+ def mergeEvents(e: EventPart, overlay: EventPart) = EventPart(
+ overlay.host orElse e.host,
+ overlay.service orElse e.service,
+ overlay.state orElse e.state,
+ overlay.time orElse e.time,
+ overlay.description orElse e.description,
+ (overlay.tags.toSet ++ e.tags).toSeq.sorted,
+ overlay.metric orElse e.metric,
+ overlay.ttl orElse e.ttl)
+
+ class EventPartCombinator(e: EventPart) {
+ def |(overlay: EventPart) = mergeEvents(e, overlay)
+ }
+
+ implicit def eventPartToEventPartCombinator(e: EventPart) = new EventPartCombinator(e)
+
+ def host(s: String) = EventPart(host = Some(s))
+ def service(s: String) = EventPart(service = Some(s))
+ def state(s: String) = EventPart(state = Some(s))
+ def time(l: Long) = EventPart(time = Some(l))
+ def description(s: String) = EventPart(description = Some(s))
+ def tags(s: String*) = EventPart(tags = s)
+ def metric(m: Long) = EventPart(metric = Some(m))
+ def metric(m: Float) = EventPart(metric = Some(m))
+ def metric(m: Double) = EventPart(metric = Some(m))
+ def ttl(f: Float) = EventPart(ttl = Some(f))
+}
+
+object EventDSL extends EventDSL
@@ -0,0 +1,21 @@
+package net.benmur.riemann.client
+
+trait EventSenderDSL {
+ class EventSenderOff[T <: TransportType](e: EventPart)(implicit messenger: SendOff[T]) {
+ def |>>(d: Destination[T]) = d send e
+ }
+
+ class EventSender[T <: TransportType](e: EventPart)(implicit messenger: SendAndExpectFeedback[T]) {
+ def |><(d: Destination[T]) = d ask e
+ }
+
+ class QuerySender[T <: TransportType](q: Query)(implicit messenger: SendAndExpectFeedback[T]) {
+ def |><(d: Destination[T]) = d ask q
+ }
+
+ implicit def event2EventSenderOff[T <: TransportType](e: EventPart)(implicit messenger: SendOff[T]) = new EventSenderOff[T](e)
+ implicit def event2EventSender[T <: TransportType](e: EventPart)(implicit messenger: SendAndExpectFeedback[T]) = new EventSender[T](e)
+ implicit def query2QuerySender[T <: TransportType](q: Query)(implicit messenger: SendAndExpectFeedback[T]) = new QuerySender[T](q)
+}
+
+object EventSenderDSL extends EventSenderDSL
@@ -0,0 +1,105 @@
+package net.benmur.riemann.client
+
+import java.io.{ DataInputStream, DataOutputStream }
+import java.net.{ Socket, SocketAddress }
+import java.util.concurrent.atomic.AtomicLong
+import com.aphyr.riemann.Proto
+import akka.actor.{ Actor, ActorLogging, ActorSystem, Props, actorRef2Scala }
+import akka.dispatch.Future
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.dispatch.Promise
+import akka.actor.OneForOneStrategy
+import akka.actor.SupervisorStrategy._
+import akka.util.duration._
+import akka.actor.ActorRef
+import java.net.SocketException
+
+trait ReliableIO {
+ private val nClients = new AtomicLong(0L) // FIXME this should be more global
+
+ private[this] class ReliableConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory, dispatcherId: Option[String])(implicit system: ActorSystem) extends Actor {
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 36000, withinTimeRange = 1 hour) { // This needs to be more reasonable
+ case _ => Restart
+ }
+
+ val props = {
+ val p = Props(new TcpConnectionActor(where, factory))
+ if (dispatcherId.isEmpty) p else p.withDispatcher(dispatcherId.get)
+ }
+
+ val ioActor = context.actorOf(props, "io")
+
+ def receive = {
+ case message => ioActor forward message
+ }
+ }
+
+ implicit object ReliableSendAndExpectFeedback extends SendAndExpectFeedback[Reliable] {
+ def send(connection: Connection[Reliable], command: Write)(implicit system: ActorSystem, timeout: Timeout): Future[Either[RemoteError, List[EventPart]]] =
+ connection match {
+ case rc: ReliableConnection =>
+ (rc.ioActor ask command).mapTo[Either[RemoteError, List[EventPart]]]
+ case c =>
+ Promise.successful(Left(RemoteError(
+ "don't know how to send data to " + c.getClass.getName)))
+ }
+ }
+
+ implicit object ReliableSendOff extends SendOff[Reliable] {
+ def sendOff(connection: Connection[Reliable], command: Write): Unit = connection match {
+ case rc: ReliableConnection =>
+ rc.ioActor tell command
+ case c =>
+ System.err.println(
+ "don't know how to send data to " + c.getClass.getName)
+ }
+ }
+
+ class TcpConnectionActor(where: SocketAddress, factory: Reliable#SocketFactory) extends Actor with ActorLogging {
+ val connection = factory(where)
+ val outputStream = new DataOutputStream(connection.outputStream)
+ val inputStream = new DataInputStream(connection.inputStream)
+ println("actor init")
+ def receive = {
+ case Write(msg) =>
+ try {
+ val ab = msg.toByteArray
+ outputStream writeInt ab.length
+ outputStream write ab
+ outputStream.flush
+ val buf = Array.ofDim[Byte](inputStream.readInt())
+ inputStream.readFully(buf)
+ sender ! Serializers.unserializeProtoMsg(Proto.Msg.parseFrom(buf))
+ } catch {
+ case e: SocketException => throw e
+ case exception =>
+ log.error(exception, "could not send or receive data")
+ sender ! Left(RemoteError(exception.getMessage()))
+ }
+ }
+ }
+
+ val makeTcpConnection: Reliable#SocketFactory = (addr) => {
+ val socket = new Socket()
+ socket.connect(addr)
+ new ConnectedSocketWrapper {
+ override def outputStream = socket.getOutputStream()
+ override def inputStream = socket.getInputStream()
+ }
+ }
+
+ class ReliableConnection(val ioActor: ActorRef) extends Connection[Reliable]
+
+ implicit object TwoWayConnectionBuilder extends ConnectionBuilder[Reliable] {
+ def buildConnection(where: SocketAddress, factory: Option[Reliable#SocketFactory] = None, dispatcherId: Option[String])(implicit system: ActorSystem, timeout: Timeout): Connection[Reliable] = {
+ val props = {
+ val p = Props(new ReliableConnectionActor(where, factory getOrElse makeTcpConnection, dispatcherId))
+ if (dispatcherId.isEmpty) p else p.withDispatcher(dispatcherId.get)
+ }
+ new ReliableConnection(system.actorOf(props, "riemann-tcp-client-" + nClients.incrementAndGet))
+ }
+ }
+}
+
+object ReliableIO extends ReliableIO
@@ -0,0 +1,16 @@
+package net.benmur.riemann.client
+
+import akka.actor.ActorSystem
+import akka.util.Timeout
+
+object RiemannClient
+ extends EventDSL
+ with EventSenderDSL
+ with Serializers
+ with ReliableIO
+ with UnreliableIO
+ with DestinationOps {
+
+ def riemannConnectAs[T <: TransportType](implicit connectionBuilder: ConnectionBuilder[T], system: ActorSystem, timeout: Timeout): DestinationBuilder[T] =
+ new DestinationBuilder[T](connectionBuilder)
+}
Oops, something went wrong.

0 comments on commit e71a4a1

Please sign in to comment.