Skip to content

Commit

Permalink
Reformat code with scalaFmt
Browse files Browse the repository at this point in the history
  • Loading branch information
luisdeltoro committed Sep 17, 2018
1 parent 667d768 commit 9ab8c36
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 63 deletions.
@@ -1,7 +1,7 @@
package org.http4s.client.metrics.core

import cats.data.Kleisli
import cats.effect.{Sync, Clock}
import cats.effect.{Clock, Sync}
import cats.implicits._
import java.util.concurrent.TimeUnit
import org.http4s.Request
Expand All @@ -10,44 +10,44 @@ import scala.concurrent.TimeoutException

object Metrics {
def apply[F[_], R: MetricsOpsFactory](
registry: R,
prefix: String = "org.http4s.client",
destination: Request[F] => Option[String] = { _: Request[F] => None}
registry: R,
prefix: String = "org.http4s.client",
destination: Request[F] => Option[String] = { _: Request[F] =>
None
}
)(client: Client[F])(implicit F: Sync[F], clock: Clock[F]): Client[F] = {
val ops = implicitly[MetricsOpsFactory[R]].instance[F](registry, prefix)

def withMetrics()(req: Request[F]): F[DisposableResponse[F]] = {
def withMetrics()(req: Request[F]): F[DisposableResponse[F]] =
(for {
start <- clock.monotonic(TimeUnit.NANOSECONDS)
_ <- ops.increaseActiveRequests(destination(req))
resp <- client.open(req)
end <- clock.monotonic(TimeUnit.NANOSECONDS)
_ <- ops.registerRequestHeadersTime(resp.response.status, end - start, destination(req))
_ <- ops.increaseActiveRequests(destination(req))
resp <- client.open(req)
end <- clock.monotonic(TimeUnit.NANOSECONDS)
_ <- ops.registerRequestHeadersTime(resp.response.status, end - start, destination(req))
iResp <- instrumentResponse(start, destination(req), resp)
} yield iResp).handleErrorWith { e =>
ops.decreaseActiveRequests(destination(req)) *> handleError(req, e) *>
F.raiseError[DisposableResponse[F]](e)
F.raiseError[DisposableResponse[F]](e)
}
}

def handleError(req: Request[F], e: Throwable): F[Unit] = {
def handleError(req: Request[F], e: Throwable): F[Unit] =
if (e.isInstanceOf[TimeoutException]) {
ops.increaseTimeouts(destination(req))
} else {
ops.increaseErrors(destination(req))
}
}

def instrumentResponse(
start: Long,
destination: Option[String],
disposableResponse: DisposableResponse[F]
): F[DisposableResponse[F]] = {
val newDisposable = for {
_ <- ops.decreaseActiveRequests(destination)
_ <- ops.decreaseActiveRequests(destination)
elapsed <- clock.monotonic(TimeUnit.NANOSECONDS).map(now => now - start)
_ <- ops.registerRequestTotalTime(disposableResponse.response.status, elapsed, destination)
_ <- disposableResponse.dispose
_ <- ops.registerRequestTotalTime(disposableResponse.response.status, elapsed, destination)
_ <- disposableResponse.dispose
} yield ()

F.delay(disposableResponse.copy(dispose = newDisposable))
Expand Down
Expand Up @@ -9,7 +9,10 @@ trait MetricsOps[F[_]] {

def decreaseActiveRequests(destination: Option[String]): F[Unit]

def registerRequestHeadersTime(status: Status, elapsed: Long, destination: Option[String]): F[Unit]
def registerRequestHeadersTime(
status: Status,
elapsed: Long,
destination: Option[String]): F[Unit]

def registerRequestTotalTime(status: Status, elapsed: Long, destination: Option[String]): F[Unit]

Expand Down
Expand Up @@ -6,7 +6,8 @@ import java.util.concurrent.TimeUnit
import org.http4s.Status
import org.http4s.client.metrics.core.{MetricsOps, MetricsOpsFactory}

class CodaHaleOps[F[_]](registry: MetricRegistry, prefix: String)(implicit F: Sync[F]) extends MetricsOps[F] {
class CodaHaleOps[F[_]](registry: MetricRegistry, prefix: String)(implicit F: Sync[F])
extends MetricsOps[F] {

override def increaseActiveRequests(destination: Option[String]): F[Unit] = F.delay {
registry.counter(s"${namespace(prefix, destination)}.active-requests").inc()
Expand All @@ -16,13 +17,19 @@ class CodaHaleOps[F[_]](registry: MetricRegistry, prefix: String)(implicit F: Sy
registry.counter(s"${namespace(prefix, destination)}.active-requests").dec()
}

override def registerRequestHeadersTime(status: Status, elapsed: Long, destination: Option[String]): F[Unit] = F.delay {
override def registerRequestHeadersTime(
status: Status,
elapsed: Long,
destination: Option[String]): F[Unit] = F.delay {
registry
.timer(s"${namespace(prefix, destination)}.requests.headers")
.update(elapsed, TimeUnit.NANOSECONDS)
}

override def registerRequestTotalTime(status: Status, elapsed: Long, destination: Option[String]): F[Unit] = F.delay {
override def registerRequestTotalTime(
status: Status,
elapsed: Long,
destination: Option[String]): F[Unit] = F.delay {
registry
.timer(s"${namespace(prefix, destination)}.requests.total")
.update(elapsed, TimeUnit.NANOSECONDS)
Expand All @@ -38,13 +45,13 @@ class CodaHaleOps[F[_]](registry: MetricRegistry, prefix: String)(implicit F: Sy
registry.counter(s"${namespace(prefix, destination)}.timeouts").inc()
}

private def namespace(prefix: String, destination: Option[String]): String = {
private def namespace(prefix: String, destination: Option[String]): String =
destination.map(d => s"${prefix}.${d}").getOrElse(s"${prefix}.default")
}

private def registerStatusCode(status: Status, destination: Option[String]) = {
private def registerStatusCode(status: Status, destination: Option[String]) =
status.code match {
case hundreds if hundreds < 200 => registry.counter(s"${namespace(prefix, destination)}.1xx-responses").inc()
case hundreds if hundreds < 200 =>
registry.counter(s"${namespace(prefix, destination)}.1xx-responses").inc()
case twohundreds if twohundreds < 300 =>
registry.counter(s"${namespace(prefix, destination)}.2xx-responses").inc()
case threehundreds if threehundreds < 400 =>
Expand All @@ -53,12 +60,12 @@ class CodaHaleOps[F[_]](registry: MetricRegistry, prefix: String)(implicit F: Sy
registry.counter(s"${namespace(prefix, destination)}.4xx-responses").inc()
case _ => registry.counter(s"${namespace(prefix, destination)}.5xx-responses").inc()
}
}
}

class CodaHaleOpsFactory extends MetricsOpsFactory[MetricRegistry] {

override def instance[F[_]: Sync](registry: MetricRegistry, prefix: String): MetricsOps[F] = new CodaHaleOps[F](registry, prefix)
override def instance[F[_]: Sync](registry: MetricRegistry, prefix: String): MetricsOps[F] =
new CodaHaleOps[F](registry, prefix)

}

Expand Down
Expand Up @@ -3,7 +3,7 @@ package org.http4s.client.metrics.codahale
import cats.effect.{Clock, IO}
import com.codahale.metrics.{MetricRegistry, SharedMetricRegistries}
import java.io.IOException
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.{TimeUnit, TimeoutException}
import org.http4s.{Http4sSpec, HttpApp, Response, Status}
import org.http4s.client.{Client, UnexpectedStatus}
import org.http4s.client.metrics.codahale.CodaHaleOps._
Expand Down Expand Up @@ -103,7 +103,8 @@ class CodaHaleMetricsSpec extends Http4sSpec {
}

def count(registry: MetricRegistry, name: String): Long = registry.getCounters.get(name).getCount
def values(registry: MetricRegistry, name: String): Option[Array[Long]] = Option(registry.getTimers().get(name)).map(_.getSnapshot.getValues)
def values(registry: MetricRegistry, name: String): Option[Array[Long]] =
Option(registry.getTimers().get(name)).map(_.getSnapshot.getValues)
}

class FakeClock extends Clock[IO] {
Expand Down Expand Up @@ -137,4 +138,3 @@ object RemoteEndpointStub {
NotFound("404 Not Found")
}
}

Expand Up @@ -5,9 +5,10 @@ import io.prometheus.client._
import org.http4s.Status
import org.http4s.client.metrics.core.{MetricsOps, MetricsOpsFactory}

class PrometheusOps[F[_]](registry: CollectorRegistry, prefix: String)(implicit F: Sync[F]) extends MetricsOps[F] {
class PrometheusOps[F[_]](registry: CollectorRegistry, prefix: String)(implicit F: Sync[F])
extends MetricsOps[F] {

override def increaseActiveRequests(destination: Option[String]):F[Unit] = F.delay {
override def increaseActiveRequests(destination: Option[String]): F[Unit] = F.delay {
metrics.activeRequests
.labels(label(destination))
.inc()
Expand All @@ -19,7 +20,10 @@ class PrometheusOps[F[_]](registry: CollectorRegistry, prefix: String)(implicit
.dec()
}

override def registerRequestHeadersTime(status: Status, elapsed: Long, destination: Option[String]): F[Unit] = F.delay {
override def registerRequestHeadersTime(
status: Status,
elapsed: Long,
destination: Option[String]): F[Unit] = F.delay {
metrics.responseDuration
.labels(
label(destination),
Expand All @@ -31,7 +35,10 @@ class PrometheusOps[F[_]](registry: CollectorRegistry, prefix: String)(implicit
.inc()
}

override def registerRequestTotalTime(status: Status, elapsed: Long, destination: Option[String]): F[Unit] = F.delay {
override def registerRequestTotalTime(
status: Status,
elapsed: Long,
destination: Option[String]): F[Unit] = F.delay {
metrics.responseDuration
.labels(
label(destination),
Expand Down Expand Up @@ -99,11 +106,11 @@ class PrometheusOps[F[_]](registry: CollectorRegistry, prefix: String)(implicit
}

case class ClientMetrics(
responseDuration: Histogram,
activeRequests: Gauge,
responseCounter: Counter,
clientErrorsCounter: Counter,
timeoutsCounter: Counter
responseDuration: Histogram,
activeRequests: Gauge,
responseCounter: Counter,
clientErrorsCounter: Counter,
timeoutsCounter: Counter
)

private sealed trait ResponsePhase
Expand All @@ -118,13 +125,13 @@ private object ResponsePhase {

class PrometheusOpsFactory extends MetricsOpsFactory[CollectorRegistry] {

override def instance[F[_]: Sync](registry: CollectorRegistry, prefix: String): MetricsOps[F] = new PrometheusOps[F](registry, prefix)
override def instance[F[_]: Sync](registry: CollectorRegistry, prefix: String): MetricsOps[F] =
new PrometheusOps[F](registry, prefix)

}

object PrometheusOps {

implicit def prometheusMetricsFactory: MetricsOpsFactory[CollectorRegistry] = new PrometheusOpsFactory()
implicit def prometheusMetricsFactory: MetricsOpsFactory[CollectorRegistry] =
new PrometheusOpsFactory()
}


Expand Up @@ -3,7 +3,7 @@ package org.http4s.client.metrics.prometheus
import cats.effect.{Clock, IO}
import io.prometheus.client.CollectorRegistry
import java.io.IOException
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.{TimeUnit, TimeoutException}
import org.http4s.{Http4sSpec, HttpApp, Response, Status}
import org.http4s.client.{Client, UnexpectedStatus}
import org.http4s.client.metrics.core.Metrics
Expand Down Expand Up @@ -102,36 +102,49 @@ class PrometheusMetricsSpec extends Http4sSpec {
case "active_requests" =>
registry.getSampleValue("client_active_request_count", Array("destination"), Array(""))
case "2xx_responses" =>
registry.getSampleValue("client_response_total",
Array("destination", "code"), Array("", "2xx"))
registry
.getSampleValue("client_response_total", Array("destination", "code"), Array("", "2xx"))
case "2xx_headers_duration" =>
registry.getSampleValue("client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"), Array("", "2xx", "response_received"))
registry.getSampleValue(
"client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"),
Array("", "2xx", "response_received"))
case "2xx_total_duration" =>
registry.getSampleValue("client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"), Array("", "2xx", "body_processed"))
registry.getSampleValue(
"client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"),
Array("", "2xx", "body_processed"))
case "4xx_responses" =>
registry.getSampleValue("client_response_total",
Array("destination", "code"), Array("", "4xx"))
registry
.getSampleValue("client_response_total", Array("destination", "code"), Array("", "4xx"))
case "4xx_headers_duration" =>
registry.getSampleValue("client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"), Array("", "4xx", "response_received"))
registry.getSampleValue(
"client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"),
Array("", "4xx", "response_received"))
case "4xx_total_duration" =>
registry.getSampleValue("client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"), Array("", "4xx", "body_processed"))
registry.getSampleValue(
"client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"),
Array("", "4xx", "body_processed"))
case "5xx_responses" =>
registry.getSampleValue("client_response_total",
Array("destination", "code"), Array("", "5xx"))
registry
.getSampleValue("client_response_total", Array("destination", "code"), Array("", "5xx"))
case "5xx_headers_duration" =>
registry.getSampleValue("client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"), Array("", "5xx", "response_received"))
registry.getSampleValue(
"client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"),
Array("", "5xx", "response_received"))
case "5xx_total_duration" =>
registry.getSampleValue("client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"), Array("", "5xx", "body_processed"))
registry.getSampleValue(
"client_response_duration_seconds_sum",
Array("destination", "code", "response_phase"),
Array("", "5xx", "body_processed"))
case "timeouts" =>
registry.getSampleValue("client_client_timeouts_total", Array("destination"), Array(""))
case "client_errors" =>
registry.getSampleValue("client_client_errors_total", Array("destination"), Array("")) }
registry.getSampleValue("client_client_errors_total", Array("destination"), Array(""))
}
}

class FakeClock extends Clock[IO] {
Expand Down Expand Up @@ -165,4 +178,3 @@ object RemoteEndpointStub {
NotFound("404 Not Found")
}
}

0 comments on commit 9ab8c36

Please sign in to comment.