Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce timeout middleware based on Concurrent.race #1725

Merged
merged 2 commits into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions server/src/main/scala/org/http4s/server/middleware/Timeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,14 @@ import scala.concurrent.duration.{Duration, FiniteDuration}

object Timeout {

/** Transform the service to return whichever resolves first: the
* provided F[Response[F]], or the service response task. The
* service response task continues to run in the background. To
* interrupt a server side response safely, look at
* `scalaz.stream.wye.interrupt`.
*
* @param timeoutResponse F[Response] to race against the result of the service. This will be run for each [[Request]]
* @param service [[org.http4s.HttpService]] to transform
*/
@deprecated("Exists to support deprecated methods", "0.18.4")
private def race[F[_]: Effect](timeoutResponse: F[Response[F]])(service: HttpService[F])(
implicit executionContext: ExecutionContext): HttpService[F] =
service.mapF { resp =>
OptionT(fs2AsyncRace(resp.value, timeoutResponse.map(_.some)).map(_.merge))
}

/**
* Returns an effect that, when run, races evaluation of `fa` and `fb`,
* and returns the result of whichever completes first. The losing effect
* continues to execute in the background though its result will be sent
* nowhere.
*
* Internalized from fs2 for now
*/
@deprecated("Exists to support deprecated methods", "0.18.4")
private def fs2AsyncRace[F[_], A, B](fa: F[A], fb: F[B])(
implicit F: Effect[F],
ec: ExecutionContext): F[Either[A, B]] =
Expand All @@ -58,16 +43,17 @@ object Timeout {
go *> p.get.flatMap(F.fromEither)
}

/** Transform the service to return a timeout response [[Status]]
* after the supplied duration if the service response is not yet
* ready. The service response task continues to run in the
* background. To interrupt a server side response safely, look at
* `scalaz.stream.wye.interrupt`.
/** Transform the service to return a timeout response after the given
* duration if the service has not yet responded. If the timeout
* fires, the service's response continues to run in the background
* and is discarded.
*
* @param timeout Duration to wait before returning the
* RequestTimeOut
* @param timeout Finite duration to wait before returning `response`
* @param service [[HttpService]] to transform
*/
@deprecated(
"Use apply(FiniteDuration, F[Response[F]](HttpService[F]) instead. That cancels the losing effect.",
"0.18.4")
def apply[F[_]: Effect](timeout: Duration, response: F[Response[F]])(service: HttpService[F])(
implicit executionContext: ExecutionContext,
scheduler: Scheduler): HttpService[F] =
Expand All @@ -76,9 +62,48 @@ object Timeout {
case _ => service
}

/** Transform the service to return a timeout response after the given
* duration if the service has not yet responded. If the timeout
* fires, the service's response continues to run in the background
* and is discarded.
*
* @param timeout Finite duration to wait before returning `response`
*/
@deprecated(
"Use apply(FiniteDuration)(HttpService[F]) instead. That cancels the losing effect.",
"0.18.4")
def apply[F[_]: Effect](timeout: Duration)(service: HttpService[F])(
implicit executionContext: ExecutionContext,
scheduler: Scheduler): HttpService[F] =
apply(timeout, Response[F](Status.InternalServerError).withBody("The service timed out."))(
service)

/** Transform the service to return a timeout response after the given
* duration if the service has not yet responded. If the timeout
* fires, the service's response is canceled.
*
* @param timeout Finite duration to wait before returning a `500
* Internal Server Error` response
* @param service [[HttpService]] to transform
*/
def apply[F[_]](timeout: FiniteDuration, timeoutResponse: F[Response[F]])(
service: HttpService[F])(implicit F: Concurrent[F], T: Timer[F]): HttpService[F] = {
val OTC = Concurrent[OptionT[F, ?]]
service
.mapF(respF => OTC.race(respF, OptionT.liftF(T.sleep(timeout) *> timeoutResponse)))
.map(_.merge)
}

/** Transform the service to return a timeout response after the given
* duration if the service has not yet responded. If the timeout
* fires, the service's response is canceled.
*
* @param timeout Finite duration to wait before returning a `500
* Internal Server Error` response
* @param service [[HttpService]] to transform
*/
def apply[F[_]](timeout: FiniteDuration)(
service: HttpService[F])(implicit F: Concurrent[F], T: Timer[F]): HttpService[F] =
apply(timeout, Response[F](Status.InternalServerError).withBody("The service timed out."))(
service)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class TimeoutSpec extends Http4sSpec {
resp.unsafeRunTimed(3.seconds).getOrElse(throw new TimeoutException) must haveStatus(status)

"Timeout Middleware" should {
"have no effect if the response is not delayed" in {
val service = Timeout(Duration.Inf)(myService)
"have no effect if the response is timely" in {
val service = Timeout(365.days)(myService)
checkStatus(service.orNotFound(fastReq), Status.Ok)
}

Expand Down