Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Thibault Jeandet committed Sep 3, 2017
1 parent 070b877 commit 43d6f36
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object StatsDConfig {
val hostname: ErrorOr[String] = validate[String] { statsDConfig.as[String]("hostname") }
val port: ErrorOr[Int] = validate[Int] { statsDConfig.as[Int]("port") }
val prefix: ErrorOr[Option[String]] = statsDConfig.as[Option[String]]("prefix").validNel
val flushRate: ErrorOr[FiniteDuration] = statsDConfig.as[FiniteDuration]("flush-rate").validNel
val flushRate: ErrorOr[FiniteDuration] = validate[FiniteDuration] { statsDConfig.as[FiniteDuration]("flush-rate") }

(hostname, port, prefix, flushRate) mapN { (h, p, n, f) =>
new StatsDConfig(h, p, n, f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ object StatsDInstrumentationServiceActor {
class StatsDInstrumentationServiceActor(serviceConfig: Config, globalConfig: Config) extends Actor with DefaultInstrumented {
val statsDConfig = StatsDConfig(serviceConfig)

override lazy val metricBaseName = MetricName(statsDConfig.prefix.getOrElse(""))
override lazy val metricBaseName = MetricName("cromwell")

StatsDReporter
.forRegistry(metricRegistry)
.prefixedWith(metricBaseName.name)
.prefixedWith(statsDConfig.prefix.orNull)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(CromwellStatsD(statsDConfig.hostname, statsDConfig.port))
Expand Down Expand Up @@ -113,11 +113,6 @@ class StatsDInstrumentationServiceActor(serviceConfig: Config, globalConfig: Con
* Adds a new timing value for this bucket
*/
private def updateTiming(bucket: CromwellBucket, value: FiniteDuration) = {
/*
* We wouldn't need to separately increment the bucket if we were using statsD timer but because
* metric-scala sends everything as a gauge we need to if we want to count them
*/
metrics.timer(bucket.toStatsDString(TimingInsert)).update(value)
increment(bucket)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cromwell.services.instrumentation.impl.statsd

import com.typesafe.config.ConfigFactory
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._

class StatsDConfigSpec extends FlatSpec with Matchers {
behavior of "StatsDConfig"

it should "parse correct service configuration" in {
val config = ConfigFactory.parseString(
"""statsd {
|hostname = "localhost"
|port = 8125
|prefix = "prefix_value" # can be used to prefix all metrics with an api key for example
|flush-rate = 1 second # rate at which aggregated metrics will be sent to statsd
|}
""".stripMargin
)

val statsDConfig = StatsDConfig(config)

statsDConfig.hostname shouldBe "localhost"
statsDConfig.port shouldBe 8125
statsDConfig.prefix shouldBe Some("prefix_value")
statsDConfig.flushRate shouldBe 1.second
}

it should "not parse incorrect service configuration" in {
val config = ConfigFactory.parseString("statsd {}")

val exception = the[IllegalArgumentException] thrownBy StatsDConfig(config)

exception.getMessage shouldBe """StatsD config is invalid:
|No configuration setting found for key 'hostname'
|No configuration setting found for key 'port'
|No configuration setting found for key 'flush-rate'""".stripMargin
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package cromwell.services.instrumentation.impl.statsd

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.io.{IO, Udp}
import akka.testkit.{TestActorRef, TestProbe}
import cats.data.NonEmptyList
import com.typesafe.config.ConfigFactory
import cromwell.core.TestKitSuite
import cromwell.services.instrumentation.InstrumentationService.InstrumentationServiceMessage
import cromwell.services.instrumentation._
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}

import scala.concurrent.duration._
class StatsDInstrumentationServiceActorSpec extends TestKitSuite with FlatSpecLike with BeforeAndAfterAll with Matchers {
behavior of "StatsDInstrumentationServiceActor"

val config = ConfigFactory.parseString(
"""statsd {
|hostname = "localhost"
|port = 8125
|prefix = "prefix_value" # can be used to prefix all metrics with an api key for example
|flush-rate = 100 ms # rate at which aggregated metrics will be sent to statsd
|}
""".stripMargin
)

val udpProbe = TestProbe()
val patience = 1.second
val testBucket = CromwellBucket(List("test_prefix"), NonEmptyList.of("test", "metric", "bucket"))

var udpListenerActor: ActorRef = _

override def beforeAll(): Unit = {
// Start an actor listening to the UDP port and forwarding messages to the udpProbe
udpListenerActor = system.actorOf(Props(new UDPListenerActor(new InetSocketAddress("localhost", 8125), udpProbe.ref)))
// Give a sec to the actor to open an UDP socket
Thread.sleep(1.second.toMillis)
super.beforeAll()
}

override def afterAll(): Unit = {
udpListenerActor ! Udp.Unbind
super.afterAll()
}

case class StatsDTestBit(description: String, metric: CromwellMetric, expectedPackets: Set[String])

// Note: The current StatsD implementation sends everything as StatsD gauges so we expect all packets to be "...|g"
List(
StatsDTestBit("increment counters", CromwellIncrement(testBucket),
Set("prefix_value.cromwell.test_prefix.count.test.metric.bucket.samples:1|g",
"prefix_value.cromwell.test_prefix.count.test.metric.bucket.m1_rate:0.00|g"
)
),
StatsDTestBit("add count", CromwellCount(testBucket, 80),
Set("prefix_value.cromwell.test_prefix.count.test.metric.bucket.samples:81|g",
"prefix_value.cromwell.test_prefix.count.test.metric.bucket.m1_rate:0.00|g"
)
),
StatsDTestBit("set gauges", CromwellGauge(testBucket, 89),
Set("prefix_value.cromwell.test_prefix.test.metric.bucket:89|g")
),
StatsDTestBit("set timings", CromwellTiming(testBucket, 5.seconds),
Set("prefix_value.cromwell.test_prefix.timing.test.metric.bucket.stddev:0.00|g",
"prefix_value.cromwell.test_prefix.timing.test.metric.bucket.samples:1|g",
"prefix_value.cromwell.test_prefix.timing.test.metric.bucket.p95:5000.00|g",
"prefix_value.cromwell.test_prefix.timing.test.metric.bucket.mean:5000.00|g",
"prefix_value.cromwell.test_prefix.timing.test.metric.bucket.m1_rate:0.00|g"
)
)
) foreach {
case StatsDTestBit(description, metric, expectedPackets) =>
it should description in {
val instrumentationActor = TestActorRef(new StatsDInstrumentationServiceActor(config, ConfigFactory.load()))
instrumentationActor ! InstrumentationServiceMessage(metric)
val received = udpProbe.receiveWhile(patience) {
case Udp.Received(data, _) => data.utf8String
}

expectedPackets foreach { packet => received.contains(packet) shouldBe true }
}
}

private class UDPListenerActor(remote: InetSocketAddress, sendTo: ActorRef) extends Actor with ActorLogging {
implicit val system = context.system
IO(Udp) ! Udp.Bind(sendTo, remote)

def receive = {
case Udp.Bound(_) => context.become(ready(sender()))
}

def ready(socket: ActorRef): Receive = {
case Udp.Unbind => socket ! Udp.Unbind
case Udp.Unbound => context.stop(self)
case other => log.error(s"received unexpected message: $other")
}
}
}

0 comments on commit 43d6f36

Please sign in to comment.