Skip to content

Commit

Permalink
Elevate to Handle Cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherDavenport committed Feb 12, 2019
1 parent 4739e6d commit e9654c1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 36 deletions.
Expand Up @@ -7,6 +7,7 @@ import cats.arrow.FunctionK
import cats.implicits._
import cats.data._
import cats.effect._
import cats.effect.Sync._
import fs2._
import org.http4s.util.CaseInsensitiveString
import org.log4s.getLogger
Expand All @@ -17,7 +18,7 @@ import org.log4s.getLogger
object Logger {
private[this] val logger = getLogger

def apply[G[_]: MonadError[?[_], Throwable], F[_]: Concurrent](
def apply[G[_]: Bracket[?[_], Throwable], F[_]: Concurrent](
logHeaders: Boolean,
logBody: Boolean,
fk: F ~> G,
Expand Down
Expand Up @@ -6,11 +6,13 @@ import cats._
import cats.arrow.FunctionK
import cats.data._
import cats.effect._
import cats.effect.implicits._
import cats.effect.concurrent.Ref
import cats.implicits._
import fs2._
import org.http4s.util.CaseInsensitiveString
import org.log4s._
import cats.effect.Sync._

/**
* Simple Middleware for Logging Requests As They Are Processed
Expand All @@ -26,17 +28,23 @@ object RequestLogger {
logAction: Option[String => F[Unit]] = None
)(@deprecatedName('service) http: Http[G, F])(
implicit F: Concurrent[F],
G: MonadError[G, Throwable]
G: Bracket[G, Throwable]
): Http[G, F] = {
val log = logAction.fold({ s: String =>
Sync[F].delay(logger.info(s))
})(identity)
Kleisli { req =>
if (!logBody) {
http(req) <* fk(
// Log Occurs at Response Header Time
Logger.logMessage[F, Request[F]](req)(logHeaders, logBody)(log)
)
def logAct = Logger.logMessage[F, Request[F]](req)(logHeaders, logBody)(log)
// This construction will log on Any Error/Cancellation
// The Completed Case is Unit, as we rely on the semantics of G
// As None Is Successful, but we oly want to log on Some
http(req)
.guaranteeCase {
case ExitCase.Canceled => fk(logAct)
case ExitCase.Error(_) => fk(logAct)
case ExitCase.Completed => G.unit
} <* fk(logAct)

} else {
fk(Ref[F].of(Vector.empty[Chunk[Byte]]))
Expand All @@ -51,29 +59,36 @@ object RequestLogger {
// Cannot Be Done Asynchronously - Otherwise All Chunks May Not Be Appended Previous to Finalization
.observe(_.chunks.flatMap(c => Stream.eval_(vec.update(_ :+ c))))
)
val response: G[Response[F]] = http(changedRequest)
response.attempt
.flatMap {
case Left(e) =>
fk(
Logger.logMessage[F, Request[F]](req.withBodyStream(newBody))(
logHeaders,
logBody,
redactHeadersWhen)(log) *>
F.raiseError[Response[F]](e)
)
case Right(resp) =>
G.pure(
resp.withBodyStream(
resp.body.onFinalize(
Logger.logMessage[F, Request[F]](req.withBodyStream(newBody))(
logHeaders,
logBody,
redactHeadersWhen)(log)
)
val response: G[Response[F]] =
http(changedRequest)
.guaranteeCase {
case ExitCase.Canceled =>
fk(
Logger.logMessage[F, Request[F]](req.withBodyStream(newBody))(
logHeaders,
logBody,
redactHeadersWhen
)(log))
case ExitCase.Error(_) =>
fk(
Logger.logMessage[F, Request[F]](req.withBodyStream(newBody))(
logHeaders,
logBody,
redactHeadersWhen
)(log))
case ExitCase.Completed => G.unit
}
.map { resp =>
resp.withBodyStream(
resp.body.onFinalize(
Logger.logMessage[F, Request[F]](req.withBodyStream(newBody))(
logHeaders,
logBody,
redactHeadersWhen)(log)
)
)
}
}
response
}
}
}
Expand Down
Expand Up @@ -6,6 +6,8 @@ import cats._
import cats.arrow.FunctionK
import cats.data._
import cats.effect._
import cats.effect.implicits._
import cats.effect.Sync._
import cats.effect.concurrent.Ref
import cats.implicits._
import fs2._
Expand All @@ -25,7 +27,7 @@ object ResponseLogger {
redactHeadersWhen: CaseInsensitiveString => Boolean = Headers.SensitiveHeaders.contains,
logAction: Option[String => F[Unit]] = None)(
@deprecatedName('service) http: Kleisli[G, A, Response[F]])(
implicit G: MonadError[G, Throwable],
implicit G: Bracket[G, Throwable],
F: Concurrent[F]): Kleisli[G, A, Response[F]] = {
val fallback: String => F[Unit] = s => Sync[F].delay(logger.info(s))
val log = logAction.fold(fallback)(identity)
Expand All @@ -34,9 +36,9 @@ object ResponseLogger {
.flatMap { response =>
val out =
if (!logBody)
Logger.logMessage[F, Response[F]](response)(logHeaders, logBody, redactHeadersWhen)(
log) *> F
.delay(response)
Logger
.logMessage[F, Response[F]](response)(logHeaders, logBody, redactHeadersWhen)(log)
.as(response)
else
Ref[F].of(Vector.empty[Chunk[Byte]]).map { vec =>
val newBody = Stream
Expand All @@ -58,11 +60,11 @@ object ResponseLogger {
}
fk(out)
}
.handleErrorWith(t =>
fk(
log(s"service raised an error: ${t.getClass}") >>
F.raiseError[Response[F]](t)
))
.guaranteeCase {
case ExitCase.Error(t) => fk(log(s"service raised an error: ${t.getClass}"))
case ExitCase.Canceled => fk(log(s"service cancelled response"))
case ExitCase.Completed => G.unit
}
}
}

Expand Down

0 comments on commit e9654c1

Please sign in to comment.