Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ lazy val root = project
bundleZioHttp4sBlaze,
cassandraDatastaxDriver,
cassandraDatastaxDriverPureConfig,
catsEffect,
doobieHikari,
doobieHikariPureConfig,
example,
Expand Down Expand Up @@ -98,6 +99,11 @@ lazy val cassandraDatastaxDriverPureConfig = project
libraryDependencies += Dependencies.pureConfig
)

lazy val catsEffect = project
.in(file("cats-effect"))
.settings(BuildSettings.common)
.settings(name := "sst-cats-effect")

lazy val doobieHikari = project
.in(file("doobie-hikari"))
.settings(BuildSettings.common)
Expand Down Expand Up @@ -256,7 +262,10 @@ lazy val http4sServerMicrometer = project
.settings(BuildSettings.common)
.settings(
name := "sst-http4s-server-micrometer",
libraryDependencies += Dependencies.micrometerCore
libraryDependencies ++= Seq(
Dependencies.micrometerCore,
Dependencies.jsr305 // required because of Scala compiler
)
)

lazy val jvm = project
Expand All @@ -273,7 +282,10 @@ lazy val jvmMicrometer = project
.settings(BuildSettings.common)
.settings(
name := "sst-jvm-micrometer",
libraryDependencies += Dependencies.micrometerCore
libraryDependencies ++= Seq(
Dependencies.micrometerCore,
Dependencies.jsr305 // required because of Scala compiler
)
)

lazy val jvmPureConfig = project
Expand Down Expand Up @@ -438,6 +450,6 @@ lazy val sslConfig = project

addCommandAlias(
"check",
"; scalafmtSbtCheck; scalafmtCheckAll; compile:scalafix --check; test:scalafix --check; +test"
"; scalafmtSbtCheck; scalafmtCheckAll; +test"
)
addCommandAlias("fix", "; compile:scalafix; test:scalafix; scalafmtSbt; scalafmtAll")
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.avast.sst.catseffect

import java.util.concurrent.TimeUnit

import cats.effect.syntax.bracket._
import cats.effect.{Bracket, Clock}
import cats.syntax.flatMap._
import cats.syntax.functor._

import scala.concurrent.duration.Duration

object TimeUtils {

/** Measures the time it takes the effect to finish and records it using the provided function. */
def time[F[_], A](f: F[A])(record: Duration => F[Unit])(implicit F: Bracket[F, Throwable], C: Clock[F]): F[A] = {
val unit = TimeUnit.NANOSECONDS
for {
start <- C.monotonic(unit)
result <- f.guarantee {
C.monotonic(unit).map(computeTime(start)).flatMap(record)
}
} yield result
}

private def computeTime(start: Long)(end: Long) = Duration.fromNanos(end - start)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.avast.sst.catseffect.syntax

import cats.effect.{Bracket, Clock}
import com.avast.sst.catseffect.TimeUtils
import com.avast.sst.catseffect.syntax.TimeSyntax.FOps

import scala.concurrent.duration.Duration

trait TimeSyntax {

@SuppressWarnings(Array("scalafix:DisableSyntax.implicitConversion"))
implicit def sstFOps[F[_], A](f: F[A]): FOps[F, A] = new FOps(f)

}

object TimeSyntax {

final class FOps[F[_], A](private val f: F[A]) extends AnyVal {

/** Measures the time it takes the effect to finish and records it using the provided function. */
def time(record: Duration => F[Unit])(implicit F: Bracket[F, Throwable], C: Clock[F]): F[A] = TimeUtils.time(f)(record)

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.avast.sst.catseffect

package object syntax {

object time extends TimeSyntax

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.avast.sst.catseffect.syntax

import cats.effect.concurrent.Ref
import cats.effect.{Clock, IO, Timer}
import com.avast.sst.catseffect.syntax.time._
import org.scalatest.funsuite.AsyncFunSuite

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{Duration, TimeUnit}

class FOpsTest extends AsyncFunSuite {

implicit private val timer: Timer[IO] = IO.timer(ExecutionContext.global)

test("time") {
val sleepTime = Duration.fromNanos(500000000)
implicit val mockClock: Clock[IO] = new Clock[IO] {
var values = List(0L, sleepTime.toNanos)
override def monotonic(unit: TimeUnit): IO[Long] = {
val time = values.head
values = values.tail
IO.pure(time)
}
override def realTime(unit: TimeUnit): IO[Long] = ???
}
val io = for {
ref <- Ref.of[IO, Option[Duration]](None)
_ <- IO.sleep(sleepTime).time(d => ref.set(Some(d)))
result <- ref.get
} yield assert(result.isDefined && result.get.toMillis === sleepTime.toMillis)

io.unsafeToFuture()
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.avast.sst.http4s.server.micrometer

import java.util.concurrent.TimeUnit

import cats.effect.Sync
import cats.syntax.flatMap._
import cats.effect.Effect
import cats.effect.concurrent.Ref
import cats.syntax.functor._
import io.micrometer.core.instrument.MeterRegistry
import org.http4s.metrics.{MetricsOps, TerminationType}
Expand All @@ -12,38 +12,42 @@ import org.http4s.{Method, Status}
object MicrometerHttp4sMetricsOpsModule {

/** Makes [[org.http4s.metrics.MetricsOps]] to record the usual HTTP server metrics. */
def make[F[_]: Sync](meterRegistry: MeterRegistry): F[MetricsOps[F]] = {
val F = Sync[F]
def make[F[_]: Effect](meterRegistry: MeterRegistry): F[MetricsOps[F]] = {
val F = Effect[F]

F.delay {
new MetricsOps[F] {
private val prefix = "http.global"
private val activeRequests = meterRegistry.counter(s"$prefix.active-requests")
private val headersTime = meterRegistry.timer(s"$prefix.headers-time")
private val totalTime = meterRegistry.timer(s"$prefix.total-time")
private val failureTime = meterRegistry.timer(s"$prefix.failure-time")
private val httpStatusCodes = new HttpStatusMetrics(prefix, meterRegistry)
for {
activeRequests <- Ref.of[F, Long](0L)
} yield new MetricsOps[F] {

override def increaseActiveRequests(classifier: Option[String]): F[Unit] = F.delay(activeRequests.increment())
private val prefix = "http.global"
private val failureTime = meterRegistry.timer(s"$prefix.failure-time")

override def decreaseActiveRequests(classifier: Option[String]): F[Unit] = F.delay(activeRequests.increment(-1))
meterRegistry.gauge(
s"$prefix.active-requests",
activeRequests,
(_: Ref[F, Long]) => Effect[F].toIO(activeRequests.get).unsafeRunSync().toDouble
)

override def recordHeadersTime(method: Method, elapsed: Long, classifier: Option[String]): F[Unit] = {
F.delay(headersTime.record(elapsed, TimeUnit.NANOSECONDS))
}
override def increaseActiveRequests(classifier: Option[String]): F[Unit] = activeRequests.update(_ + 1)

override def recordTotalTime(method: Method, status: Status, elapsed: Long, classifier: Option[String]): F[Unit] = {
for {
_ <- F.delay(totalTime.record(elapsed, TimeUnit.NANOSECONDS))
_ <- F.delay(httpStatusCodes.recordHttpStatus(status))
} yield ()
}
override def decreaseActiveRequests(classifier: Option[String]): F[Unit] = activeRequests.update(_ - 1)

override def recordAbnormalTermination(elapsed: Long, terminationType: TerminationType, classifier: Option[String]): F[Unit] = {
F.delay(failureTime.record(elapsed, TimeUnit.NANOSECONDS))
}
override def recordHeadersTime(method: Method, elapsed: Long, classifier: Option[String]): F[Unit] = {
F.delay(meterRegistry.timer(s"$prefix.headers-time", "method", method.name).record(elapsed, TimeUnit.NANOSECONDS))
}

override def recordTotalTime(method: Method, status: Status, elapsed: Long, classifier: Option[String]): F[Unit] = {
F.delay(
meterRegistry
.timer(s"$prefix.total-time", "status", s"${status.code}", "status-class", s"${status.code / 100}xx")
.record(elapsed, TimeUnit.NANOSECONDS)
)
}

override def recordAbnormalTermination(elapsed: Long, terminationType: TerminationType, classifier: Option[String]): F[Unit] = {
F.delay(failureTime.record(elapsed, TimeUnit.NANOSECONDS))
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object MicrometerHttp4sServerMetricsModule {

for {
metricsOps <- MicrometerHttp4sMetricsOpsModule.make[F](meterRegistry)
routeMetrics <- Sync[F].delay(new RouteMetrics[F](meterRegistry, clock))
routeMetrics <- Sync[F].delay(new RouteMetrics[F](meterRegistry))
} yield new MicrometerHttp4sServerMetricsModule[F](Metrics(metricsOps), routeMetrics)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,34 @@
package com.avast.sst.http4s.server.micrometer

import java.util.concurrent.TimeUnit

import cats.effect.Sync
import cats.effect.syntax.bracket._
import cats.effect.{Clock, Sync}
import cats.syntax.flatMap._
import cats.syntax.functor._
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.{MeterRegistry, Timer}
import org.http4s.Response

/** Provides the usual metrics for single HTTP route. */
class RouteMetrics[F[_]: Sync](meterRegistry: MeterRegistry, clock: Clock[F]) {
/** Provides the usual metrics for a single HTTP route. */
class RouteMetrics[F[_]: Sync](meterRegistry: MeterRegistry) {

private val F = Sync[F]

/** Wraps a single route with the usual metrics (requests in-flight, timer, HTTP status counts).
/** Wraps a single route with the usual metrics (count, times, HTTP status codes).
*
* @param name will be used in metric name
*/
def wrap(name: String)(route: => F[Response[F]]): F[Response[F]] = {
val prefix = s"http.$name"
val activeRequests = meterRegistry.counter(s"$prefix.active-requests")
val timer = meterRegistry.timer(s"$prefix.total-time")
val httpStatusCodes = new HttpStatusMetrics(prefix, meterRegistry)
for {
start <- clock.monotonic(TimeUnit.NANOSECONDS)
response <-
F.delay(activeRequests.increment())
.bracket { _ => route.flatTap(response => F.delay(httpStatusCodes.recordHttpStatus(response.status))) } { _ =>
for {
time <- computeTime(start)
_ <- F.delay(activeRequests.increment(-1))
_ <- F.delay(timer.record(time, TimeUnit.NANOSECONDS))
} yield ()
}
start <- F.delay(Timer.start(meterRegistry))
response <- route.bracket(F.pure) { response =>
F.delay(
start.stop(
meterRegistry
.timer(s"http.$name", "status", s"${response.status.code}", "status-class", s"${response.status.code / 100}xx")
)
)
.as(())
}
} yield response
}

private def computeTime(start: Long): F[Long] = clock.monotonic(TimeUnit.NANOSECONDS).map(_ - start)

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.avast.sst.http4s.server.micrometer

import java.util.concurrent.TimeUnit

import cats.effect.SyncIO
import cats.effect.IO
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.http4s.{Method, Status}
import org.scalatest.funsuite.AnyFunSuite
Expand All @@ -11,15 +11,16 @@ class MicrometerHttp4sMetricsOpsModuleTest extends AnyFunSuite {

test("http4s MetricsOps for Micrometer") {
val registry = new SimpleMeterRegistry()
val metricsOps = MicrometerHttp4sMetricsOpsModule.make[SyncIO](registry).unsafeRunSync()
val metricsOps = MicrometerHttp4sMetricsOpsModule.make[IO](registry).unsafeRunSync()

metricsOps.increaseActiveRequests(None).unsafeRunSync()
metricsOps.recordTotalTime(Method.GET, Status.Ok, 2500, None).unsafeRunSync()

assert(registry.get("http.global.active-requests").counter().count() === 1)
assert(registry.get("http.global.active-requests").gauge().value() === 1)
assert(registry.get("http.global.total-time").timer().count() === 1)
assert(registry.get("http.global.total-time").timer().totalTime(TimeUnit.NANOSECONDS) > 2499)
assert(registry.get("http.global.status.200").counter().count() === 1)
assert(registry.get("http.global.total-time").tags("status", "200").timer().count() === 1)
assert(registry.get("http.global.total-time").tags("status-class", "2xx").timer().count() === 1)
}

}
Loading