Skip to content

Commit

Permalink
Request Body if read but None :(
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherDavenport committed Jul 7, 2023
1 parent 449155c commit 1ac1b1d
Showing 1 changed file with 50 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,9 @@ object ServerMiddleware {
OptionT.liftF(Clock[F].realTime).flatMap{ start =>
val reqContext = request(req, reqHeaders, routeClassifier, requestIncludeUrl, requestAdditionalContext)
Concurrent[OptionT[F, *]].uncancelable(poll =>
poll{
poll{ OptionT{
for {
reqBody <- OptionT.liftF(Concurrent[F].ref(Option.empty[fs2.Chunk[Byte]]))
reqBody <- Concurrent[F].ref(Option.empty[fs2.Chunk[Byte]])
reqPipe = {(s: fs2.Stream[F, Byte]) => s.chunks.evalMap(chunk => reqBody.update{
case Some(current) => (current ++ chunk).some
case None => chunk.some
Expand All @@ -296,40 +296,55 @@ object ServerMiddleware {
req.withBodyStream(req.body.observe(reqPipe))
} else req
}
respBody <- OptionT.liftF(Concurrent[F].ref(Option.empty[fs2.Chunk[Byte]]))
resp <- routes.run{
respBody <- Concurrent[F].ref(Option.empty[fs2.Chunk[Byte]])
respOpt <- routes.run{
newReq.withAttribute(Keys.RequestContext, reqContext)
}.value
out <- respOpt match {
case Some(resp) =>
resp.withBodyStream(
resp.body.observe((s: fs2.Stream[F, Byte]) =>
if (responseLogBody && resp.contentLength.exists(l => l <= responseBodyMaxSize)) {
s.chunks.evalMap(chunk => respBody.update{
case Some(current) => (current ++ chunk).some
case None => chunk.some
}).drain
} else s.drain
)
.onFinalizeWeak{
for {
end <- Clock[F].realTime
reqBodyFinal <- reqBody.get
reqBodyS <- reqBodyFinal.traverse(chunk => logBody(req.withBodyStream(fs2.Stream.chunk(chunk))))
respBodyFinal <- respBody.get
respBodyS <- respBodyFinal.traverse(chunk => logBody(resp.withBodyStream(fs2.Stream.chunk(chunk))))
duration = "http.duration_ms" -> end.minus(start).toMillis.toString()
requestBodyCtx = reqBodyS.map(body => Map("http.request.body" -> body)).getOrElse(Map.empty)
responseCtx = response(resp, respHeaders, responseAdditionalContext)
responseBodyCtx = respBodyS.map(body => Map("http.response.body" -> body)).getOrElse(Map.empty)
outcome = Outcome.succeeded[Option, Throwable, ResponsePrelude](resp.responsePrelude.some)
outcomeCtx = outcomeContext(outcome)
finalCtx = reqContext ++ responseCtx + outcomeCtx + duration ++ requestBodyCtx ++ responseBodyCtx
_ <- logLevelAware(logger, finalCtx, req.requestPrelude, outcome, end, logLevel, logMessage)
} yield ()
}
).some.pure[F]
case None =>
val action = for {
end <- Clock[F].realTime
reqBodyFinal <- reqBody.get
reqBodyS <- reqBodyFinal.traverse(chunk => logBody(req.withBodyStream(fs2.Stream.chunk(chunk))))
duration = "http.duration_ms" -> end.minus(start).toMillis.toString()
requestBodyCtx = reqBodyS.map(body => Map("http.request.body" -> body)).getOrElse(Map.empty)
outcome = Outcome.succeeded[Option, Throwable, ResponsePrelude](None)
outcomeCtx = outcomeContext(outcome)
finalCtx = reqContext + outcomeCtx + duration ++ requestBodyCtx
_ <- logLevelAware(logger, finalCtx, req.requestPrelude, outcome, end, logLevel, logMessage)
} yield ()
action.as(Option.empty[Response[F]])
}
} yield {
resp.withBodyStream(
resp.body.observe((s: fs2.Stream[F, Byte]) =>
if (responseLogBody && resp.contentLength.exists(l => l <= responseBodyMaxSize)) {
s.chunks.evalMap(chunk => respBody.update{
case Some(current) => (current ++ chunk).some
case None => chunk.some
}).drain
} else s.drain
)
.onFinalizeWeak{
for {
end <- Clock[F].realTime
reqBodyFinal <- reqBody.get
reqBodyS <- reqBodyFinal.traverse(chunk => logBody(req.withBodyStream(fs2.Stream.chunk(chunk))))
respBodyFinal <- respBody.get
respBodyS <- respBodyFinal.traverse(chunk => logBody(resp.withBodyStream(fs2.Stream.chunk(chunk))))
duration = "http.duration_ms" -> end.minus(start).toMillis.toString()
requestBodyCtx = reqBodyS.map(body => Map("http.request.body" -> body)).getOrElse(Map.empty)
responseCtx = response(resp, respHeaders, responseAdditionalContext)
responseBodyCtx = respBodyS.map(body => Map("http.response.body" -> body)).getOrElse(Map.empty)
outcome = Outcome.succeeded[Option, Throwable, ResponsePrelude](resp.responsePrelude.some)
outcomeCtx = outcomeContext(outcome)
finalCtx = reqContext ++ responseCtx + outcomeCtx + duration ++ requestBodyCtx ++ responseBodyCtx
_ <- logLevelAware(logger, finalCtx, req.requestPrelude, outcome, end, logLevel, logMessage)
} yield ()
}
)
}
}
} yield out
}}
.guaranteeCase{
case Outcome.Canceled() =>
OptionT.liftF(Clock[F].realTime.flatMap{ end =>
Expand All @@ -347,18 +362,7 @@ object ServerMiddleware {
val finalCtx = reqContext + outcomeCtx + duration
logLevelAware(logger, finalCtx, req.requestPrelude, outcome, end, logLevel, logMessage)
})
case Outcome.Succeeded(fa) => OptionT.liftF(fa.value.flatMap{
case None =>
// TODO move this case into poll so we can log the request body still.
Clock[F].realTime.flatMap{ end =>
val duration = "http.duration_ms" -> end.minus(start).toMillis.toString()
val outcome = Outcome.succeeded[Option, Throwable, ResponsePrelude](Option.empty)
val outcomeCtx = outcomeContext(outcome)
val finalCtx = reqContext + outcomeCtx + duration
logLevelAware(logger, finalCtx, req.requestPrelude, outcome, end, logLevel, logMessage)
}
case Some(_) => Applicative[F].unit
})
case Outcome.Succeeded(fa) => OptionT.liftF(Applicative[F].unit)
}
)
}
Expand Down

0 comments on commit 1ac1b1d

Please sign in to comment.