diff --git a/build.sbt b/build.sbt index 0b3683898..aba4af323 100644 --- a/build.sbt +++ b/build.sbt @@ -5,6 +5,7 @@ lazy val root = project bundleZioHttp4sBlaze, cassandraDatastaxDriver, cassandraDatastaxDriverPureConfig, + catsEffect, doobieHikari, doobieHikariPureConfig, example, @@ -98,6 +99,11 @@ lazy val cassandraDatastaxDriverPureConfig = project libraryDependencies += Dependencies.pureConfig ) +lazy val catsEffect = project + .in(file("cats-effect")) + .settings(BuildSettings.common) + .settings(name := "sst-cats-effect") + lazy val doobieHikari = project .in(file("doobie-hikari")) .settings(BuildSettings.common) @@ -256,7 +262,10 @@ lazy val http4sServerMicrometer = project .settings(BuildSettings.common) .settings( name := "sst-http4s-server-micrometer", - libraryDependencies += Dependencies.micrometerCore + libraryDependencies ++= Seq( + Dependencies.micrometerCore, + Dependencies.jsr305 // required because of Scala compiler + ) ) lazy val jvm = project @@ -273,7 +282,10 @@ lazy val jvmMicrometer = project .settings(BuildSettings.common) .settings( name := "sst-jvm-micrometer", - libraryDependencies += Dependencies.micrometerCore + libraryDependencies ++= Seq( + Dependencies.micrometerCore, + Dependencies.jsr305 // required because of Scala compiler + ) ) lazy val jvmPureConfig = project @@ -438,6 +450,6 @@ lazy val sslConfig = project addCommandAlias( "check", - "; scalafmtSbtCheck; scalafmtCheckAll; compile:scalafix --check; test:scalafix --check; +test" + "; scalafmtSbtCheck; scalafmtCheckAll; +test" ) addCommandAlias("fix", "; compile:scalafix; test:scalafix; scalafmtSbt; scalafmtAll") diff --git a/cats-effect/src/main/scala/com/avast/sst/catseffect/TimeUtils.scala b/cats-effect/src/main/scala/com/avast/sst/catseffect/TimeUtils.scala new file mode 100644 index 000000000..73c33c03d --- /dev/null +++ b/cats-effect/src/main/scala/com/avast/sst/catseffect/TimeUtils.scala @@ -0,0 +1,27 @@ +package com.avast.sst.catseffect + +import java.util.concurrent.TimeUnit + +import cats.effect.syntax.bracket._ +import cats.effect.{Bracket, Clock} +import cats.syntax.flatMap._ +import cats.syntax.functor._ + +import scala.concurrent.duration.Duration + +object TimeUtils { + + /** Measures the time it takes the effect to finish and records it using the provided function. */ + def time[F[_], A](f: F[A])(record: Duration => F[Unit])(implicit F: Bracket[F, Throwable], C: Clock[F]): F[A] = { + val unit = TimeUnit.NANOSECONDS + for { + start <- C.monotonic(unit) + result <- f.guarantee { + C.monotonic(unit).map(computeTime(start)).flatMap(record) + } + } yield result + } + + private def computeTime(start: Long)(end: Long) = Duration.fromNanos(end - start) + +} diff --git a/cats-effect/src/main/scala/com/avast/sst/catseffect/syntax/TimeSyntax.scala b/cats-effect/src/main/scala/com/avast/sst/catseffect/syntax/TimeSyntax.scala new file mode 100644 index 000000000..9c3316168 --- /dev/null +++ b/cats-effect/src/main/scala/com/avast/sst/catseffect/syntax/TimeSyntax.scala @@ -0,0 +1,25 @@ +package com.avast.sst.catseffect.syntax + +import cats.effect.{Bracket, Clock} +import com.avast.sst.catseffect.TimeUtils +import com.avast.sst.catseffect.syntax.TimeSyntax.FOps + +import scala.concurrent.duration.Duration + +trait TimeSyntax { + + @SuppressWarnings(Array("scalafix:DisableSyntax.implicitConversion")) + implicit def sstFOps[F[_], A](f: F[A]): FOps[F, A] = new FOps(f) + +} + +object TimeSyntax { + + final class FOps[F[_], A](private val f: F[A]) extends AnyVal { + + /** Measures the time it takes the effect to finish and records it using the provided function. */ + def time(record: Duration => F[Unit])(implicit F: Bracket[F, Throwable], C: Clock[F]): F[A] = TimeUtils.time(f)(record) + + } + +} diff --git a/cats-effect/src/main/scala/com/avast/sst/catseffect/syntax/package.scala b/cats-effect/src/main/scala/com/avast/sst/catseffect/syntax/package.scala new file mode 100644 index 000000000..8f013d98a --- /dev/null +++ b/cats-effect/src/main/scala/com/avast/sst/catseffect/syntax/package.scala @@ -0,0 +1,7 @@ +package com.avast.sst.catseffect + +package object syntax { + + object time extends TimeSyntax + +} diff --git a/cats-effect/src/test/scala/com/avast/sst/catseffect/syntax/FOpsTest.scala b/cats-effect/src/test/scala/com/avast/sst/catseffect/syntax/FOpsTest.scala new file mode 100644 index 000000000..c09b54e14 --- /dev/null +++ b/cats-effect/src/test/scala/com/avast/sst/catseffect/syntax/FOpsTest.scala @@ -0,0 +1,35 @@ +package com.avast.sst.catseffect.syntax + +import cats.effect.concurrent.Ref +import cats.effect.{Clock, IO, Timer} +import com.avast.sst.catseffect.syntax.time._ +import org.scalatest.funsuite.AsyncFunSuite + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.{Duration, TimeUnit} + +class FOpsTest extends AsyncFunSuite { + + implicit private val timer: Timer[IO] = IO.timer(ExecutionContext.global) + + test("time") { + val sleepTime = Duration.fromNanos(500000000) + implicit val mockClock: Clock[IO] = new Clock[IO] { + var values = List(0L, sleepTime.toNanos) + override def monotonic(unit: TimeUnit): IO[Long] = { + val time = values.head + values = values.tail + IO.pure(time) + } + override def realTime(unit: TimeUnit): IO[Long] = ??? + } + val io = for { + ref <- Ref.of[IO, Option[Duration]](None) + _ <- IO.sleep(sleepTime).time(d => ref.set(Some(d))) + result <- ref.get + } yield assert(result.isDefined && result.get.toMillis === sleepTime.toMillis) + + io.unsafeToFuture() + } + +} diff --git a/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/HttpStatusMetrics.scala b/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/HttpStatusMetrics.scala deleted file mode 100644 index a8292389f..000000000 --- a/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/HttpStatusMetrics.scala +++ /dev/null @@ -1,25 +0,0 @@ -package com.avast.sst.http4s.server.micrometer - -import io.micrometer.core.instrument.{Counter, MeterRegistry} -import org.http4s.Status - -import scala.collection.concurrent.TrieMap - -/** Records counts of HTTP statuses in [[io.micrometer.core.instrument.MeterRegistry]]. */ -private[micrometer] class HttpStatusMetrics(prefix: String, meterRegistry: MeterRegistry) { - - private val meters = TrieMap[Int, Counter]( - 1 -> meterRegistry.counter(s"$prefix.status.1xx"), - 2 -> meterRegistry.counter(s"$prefix.status.2xx"), - 3 -> meterRegistry.counter(s"$prefix.status.3xx"), - 4 -> meterRegistry.counter(s"$prefix.status.4xx"), - 5 -> meterRegistry.counter(s"$prefix.status.5xx") - ) - - def recordHttpStatus(status: Status): Unit = { - val code = status.code - meters(code / 100).increment() - meters.getOrElseUpdate(code, meterRegistry.counter(s"$prefix.status.$code")).increment() - } - -} diff --git a/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sMetricsOpsModule.scala b/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sMetricsOpsModule.scala index 2f9861324..0e3b0acaf 100644 --- a/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sMetricsOpsModule.scala +++ b/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sMetricsOpsModule.scala @@ -2,8 +2,8 @@ package com.avast.sst.http4s.server.micrometer import java.util.concurrent.TimeUnit -import cats.effect.Sync -import cats.syntax.flatMap._ +import cats.effect.Effect +import cats.effect.concurrent.Ref import cats.syntax.functor._ import io.micrometer.core.instrument.MeterRegistry import org.http4s.metrics.{MetricsOps, TerminationType} @@ -12,38 +12,42 @@ import org.http4s.{Method, Status} object MicrometerHttp4sMetricsOpsModule { /** Makes [[org.http4s.metrics.MetricsOps]] to record the usual HTTP server metrics. */ - def make[F[_]: Sync](meterRegistry: MeterRegistry): F[MetricsOps[F]] = { - val F = Sync[F] + def make[F[_]: Effect](meterRegistry: MeterRegistry): F[MetricsOps[F]] = { + val F = Effect[F] - F.delay { - new MetricsOps[F] { - private val prefix = "http.global" - private val activeRequests = meterRegistry.counter(s"$prefix.active-requests") - private val headersTime = meterRegistry.timer(s"$prefix.headers-time") - private val totalTime = meterRegistry.timer(s"$prefix.total-time") - private val failureTime = meterRegistry.timer(s"$prefix.failure-time") - private val httpStatusCodes = new HttpStatusMetrics(prefix, meterRegistry) + for { + activeRequests <- Ref.of[F, Long](0L) + } yield new MetricsOps[F] { - override def increaseActiveRequests(classifier: Option[String]): F[Unit] = F.delay(activeRequests.increment()) + private val prefix = "http.global" + private val failureTime = meterRegistry.timer(s"$prefix.failure-time") - override def decreaseActiveRequests(classifier: Option[String]): F[Unit] = F.delay(activeRequests.increment(-1)) + meterRegistry.gauge( + s"$prefix.active-requests", + activeRequests, + (_: Ref[F, Long]) => Effect[F].toIO(activeRequests.get).unsafeRunSync().toDouble + ) - override def recordHeadersTime(method: Method, elapsed: Long, classifier: Option[String]): F[Unit] = { - F.delay(headersTime.record(elapsed, TimeUnit.NANOSECONDS)) - } + override def increaseActiveRequests(classifier: Option[String]): F[Unit] = activeRequests.update(_ + 1) - override def recordTotalTime(method: Method, status: Status, elapsed: Long, classifier: Option[String]): F[Unit] = { - for { - _ <- F.delay(totalTime.record(elapsed, TimeUnit.NANOSECONDS)) - _ <- F.delay(httpStatusCodes.recordHttpStatus(status)) - } yield () - } + override def decreaseActiveRequests(classifier: Option[String]): F[Unit] = activeRequests.update(_ - 1) - override def recordAbnormalTermination(elapsed: Long, terminationType: TerminationType, classifier: Option[String]): F[Unit] = { - F.delay(failureTime.record(elapsed, TimeUnit.NANOSECONDS)) - } + override def recordHeadersTime(method: Method, elapsed: Long, classifier: Option[String]): F[Unit] = { + F.delay(meterRegistry.timer(s"$prefix.headers-time", "method", method.name).record(elapsed, TimeUnit.NANOSECONDS)) + } + + override def recordTotalTime(method: Method, status: Status, elapsed: Long, classifier: Option[String]): F[Unit] = { + F.delay( + meterRegistry + .timer(s"$prefix.total-time", "status", s"${status.code}", "status-class", s"${status.code / 100}xx") + .record(elapsed, TimeUnit.NANOSECONDS) + ) + } + override def recordAbnormalTermination(elapsed: Long, terminationType: TerminationType, classifier: Option[String]): F[Unit] = { + F.delay(failureTime.record(elapsed, TimeUnit.NANOSECONDS)) } + } } diff --git a/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sServerMetricsModule.scala b/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sServerMetricsModule.scala index ea73ae6e3..7841511c0 100644 --- a/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sServerMetricsModule.scala +++ b/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sServerMetricsModule.scala @@ -18,7 +18,7 @@ object MicrometerHttp4sServerMetricsModule { for { metricsOps <- MicrometerHttp4sMetricsOpsModule.make[F](meterRegistry) - routeMetrics <- Sync[F].delay(new RouteMetrics[F](meterRegistry, clock)) + routeMetrics <- Sync[F].delay(new RouteMetrics[F](meterRegistry)) } yield new MicrometerHttp4sServerMetricsModule[F](Metrics(metricsOps), routeMetrics) } diff --git a/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/RouteMetrics.scala b/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/RouteMetrics.scala index c24d2c665..c60f78fda 100644 --- a/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/RouteMetrics.scala +++ b/http4s-server-micrometer/src/main/scala/com/avast/sst/http4s/server/micrometer/RouteMetrics.scala @@ -1,42 +1,34 @@ package com.avast.sst.http4s.server.micrometer -import java.util.concurrent.TimeUnit - +import cats.effect.Sync import cats.effect.syntax.bracket._ -import cats.effect.{Clock, Sync} import cats.syntax.flatMap._ import cats.syntax.functor._ -import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.{MeterRegistry, Timer} import org.http4s.Response -/** Provides the usual metrics for single HTTP route. */ -class RouteMetrics[F[_]: Sync](meterRegistry: MeterRegistry, clock: Clock[F]) { +/** Provides the usual metrics for a single HTTP route. */ +class RouteMetrics[F[_]: Sync](meterRegistry: MeterRegistry) { private val F = Sync[F] - /** Wraps a single route with the usual metrics (requests in-flight, timer, HTTP status counts). + /** Wraps a single route with the usual metrics (count, times, HTTP status codes). * * @param name will be used in metric name */ def wrap(name: String)(route: => F[Response[F]]): F[Response[F]] = { - val prefix = s"http.$name" - val activeRequests = meterRegistry.counter(s"$prefix.active-requests") - val timer = meterRegistry.timer(s"$prefix.total-time") - val httpStatusCodes = new HttpStatusMetrics(prefix, meterRegistry) for { - start <- clock.monotonic(TimeUnit.NANOSECONDS) - response <- - F.delay(activeRequests.increment()) - .bracket { _ => route.flatTap(response => F.delay(httpStatusCodes.recordHttpStatus(response.status))) } { _ => - for { - time <- computeTime(start) - _ <- F.delay(activeRequests.increment(-1)) - _ <- F.delay(timer.record(time, TimeUnit.NANOSECONDS)) - } yield () - } + start <- F.delay(Timer.start(meterRegistry)) + response <- route.bracket(F.pure) { response => + F.delay( + start.stop( + meterRegistry + .timer(s"http.$name", "status", s"${response.status.code}", "status-class", s"${response.status.code / 100}xx") + ) + ) + .as(()) + } } yield response } - private def computeTime(start: Long): F[Long] = clock.monotonic(TimeUnit.NANOSECONDS).map(_ - start) - } diff --git a/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/HttpStatusMetricsTest.scala b/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/HttpStatusMetricsTest.scala deleted file mode 100644 index 9d9781325..000000000 --- a/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/HttpStatusMetricsTest.scala +++ /dev/null @@ -1,25 +0,0 @@ -package com.avast.sst.http4s.server.micrometer - -import io.micrometer.core.instrument.simple.SimpleMeterRegistry -import org.http4s.Status -import org.scalatest.funsuite.AnyFunSuite - -class HttpStatusMetricsTest extends AnyFunSuite { - - test("HTTP status monitoring") { - val simpleMeterRegistry = new SimpleMeterRegistry() - val target = new HttpStatusMetrics("test", simpleMeterRegistry) - - target.recordHttpStatus(Status.Ok) - target.recordHttpStatus(Status.NoContent) - target.recordHttpStatus(Status.BadRequest) - target.recordHttpStatus(Status.ServiceUnavailable) - - assert(simpleMeterRegistry.get("test.status.2xx").counter().count() === 2.0) - assert(simpleMeterRegistry.get("test.status.200").counter().count() === 1.0) - assert(simpleMeterRegistry.get("test.status.204").counter().count() === 1.0) - assert(simpleMeterRegistry.get("test.status.4xx").counter().count() === 1.0) - assert(simpleMeterRegistry.get("test.status.5xx").counter().count() === 1.0) - } - -} diff --git a/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sMetricsOpsModuleTest.scala b/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sMetricsOpsModuleTest.scala index 1cbb5cb43..0d7706924 100644 --- a/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sMetricsOpsModuleTest.scala +++ b/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/MicrometerHttp4sMetricsOpsModuleTest.scala @@ -2,7 +2,7 @@ package com.avast.sst.http4s.server.micrometer import java.util.concurrent.TimeUnit -import cats.effect.SyncIO +import cats.effect.IO import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.http4s.{Method, Status} import org.scalatest.funsuite.AnyFunSuite @@ -11,15 +11,16 @@ class MicrometerHttp4sMetricsOpsModuleTest extends AnyFunSuite { test("http4s MetricsOps for Micrometer") { val registry = new SimpleMeterRegistry() - val metricsOps = MicrometerHttp4sMetricsOpsModule.make[SyncIO](registry).unsafeRunSync() + val metricsOps = MicrometerHttp4sMetricsOpsModule.make[IO](registry).unsafeRunSync() metricsOps.increaseActiveRequests(None).unsafeRunSync() metricsOps.recordTotalTime(Method.GET, Status.Ok, 2500, None).unsafeRunSync() - assert(registry.get("http.global.active-requests").counter().count() === 1) + assert(registry.get("http.global.active-requests").gauge().value() === 1) assert(registry.get("http.global.total-time").timer().count() === 1) assert(registry.get("http.global.total-time").timer().totalTime(TimeUnit.NANOSECONDS) > 2499) - assert(registry.get("http.global.status.200").counter().count() === 1) + assert(registry.get("http.global.total-time").tags("status", "200").timer().count() === 1) + assert(registry.get("http.global.total-time").tags("status-class", "2xx").timer().count() === 1) } } diff --git a/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/RouteMetricsTest.scala b/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/RouteMetricsTest.scala index a2bc40caa..7c82fb760 100644 --- a/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/RouteMetricsTest.scala +++ b/http4s-server-micrometer/src/test/scala/com/avast/sst/http4s/server/micrometer/RouteMetricsTest.scala @@ -2,7 +2,7 @@ package com.avast.sst.http4s.server.micrometer import java.util.concurrent.TimeUnit -import cats.effect.{Clock, SyncIO} +import cats.effect.SyncIO import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.http4s.Response import org.scalatest.funsuite.AnyFunSuite @@ -11,13 +11,12 @@ class RouteMetricsTest extends AnyFunSuite { test("Single route metrics") { val registry = new SimpleMeterRegistry() - val target = new RouteMetrics[SyncIO](registry, Clock.create[SyncIO]) + val target = new RouteMetrics[SyncIO](registry) target.wrap("test")(SyncIO.pure(Response.notFound[SyncIO])).unsafeRunSync() - assert(registry.get("http.test.active-requests").counter().count() === 0) - assert(registry.get("http.test.total-time").timer().count() === 1) - assert(registry.get("http.test.total-time").timer().totalTime(TimeUnit.MILLISECONDS) > 0) - assert(registry.get("http.test.status.404").counter().count() === 1) + assert(registry.get("http.test").timer().count() === 1) + assert(registry.get("http.test").timer().totalTime(TimeUnit.MILLISECONDS) > 0) + assert(registry.get("http.test").tags("status", "404").timer().count() === 1) } }