Skip to content

Commit

Permalink
Merge pull request #1754 from aeons/retry-with-timer
Browse files Browse the repository at this point in the history
Use cats.effect.Timer for Retry middleware
  • Loading branch information
aeons committed Mar 28, 2018
2 parents c858283 + 22240a7 commit 9655331
Showing 1 changed file with 14 additions and 21 deletions.
35 changes: 14 additions & 21 deletions client/src/main/scala/org/http4s/client/middleware/Retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,30 @@ package client
package middleware

import cats.data.Kleisli
import cats.effect.Effect
import cats.effect.{Effect, Timer}
import cats.implicits._
import fs2._
import java.time.Instant
import java.time.temporal.ChronoUnit
import org.http4s.Status._
import org.http4s.headers.`Retry-After`
import org.log4s.getLogger
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.math.{min, pow, random}

object Retry {

private[this] val logger = getLogger

def apply[F[_]](policy: RetryPolicy[F])(client: Client[F])(
implicit F: Effect[F],
scheduler: Scheduler,
executionContext: ExecutionContext): Client[F] = {
def apply[F[_]](policy: RetryPolicy[F])(
client: Client[F])(implicit F: Effect[F], T: Timer[F]): Client[F] = {
def prepareLoop(req: Request[F], attempts: Int): F[DisposableResponse[F]] =
client.open(req).attempt.flatMap {
// TODO fs2 port - Reimplement request isIdempotent in some form
case Right(dr @ DisposableResponse(response, _)) =>
policy(req, Right(dr.response), attempts) match {
case Some(duration) =>
logger.info(
s"Request ${req} has failed on attempt #${attempts} with reason ${response.status}. Retrying after ${duration}.")
s"Request $req has failed on attempt #$attempts with reason ${response.status}. Retrying after $duration.")
dr.dispose.flatMap(_ =>
nextAttempt(req, attempts, duration, response.headers.get(`Retry-After`)))
case None =>
Expand All @@ -52,21 +48,18 @@ object Retry {
req: Request[F],
attempts: Int,
duration: FiniteDuration,
retryHeader: Option[`Retry-After`])(
implicit F: Effect[F],
executionContext: ExecutionContext): F[DisposableResponse[F]] = {
val headerDuration = retryHeader
.map { h =>
h.retry match {
case Left(d) => Instant.now().until(d.toInstant, ChronoUnit.SECONDS)
case Right(secs) => secs
retryHeader: Option[`Retry-After`]): F[DisposableResponse[F]] = {
val headerDuration =
retryHeader
.map { h =>
h.retry match {
case Left(d) => Instant.now().until(d.toInstant, ChronoUnit.SECONDS)
case Right(secs) => secs
}
}
}
.getOrElse(0L)
.getOrElse(0L)
val sleepDuration = headerDuration.seconds.max(duration)
scheduler.sleep_[F](sleepDuration).compile.drain *> prepareLoop(
req.withEmptyBody,
attempts + 1)
T.sleep(sleepDuration) *> prepareLoop(req.withEmptyBody, attempts + 1)
}

client.copy(open = Kleisli(prepareLoop(_, 1)))
Expand Down

0 comments on commit 9655331

Please sign in to comment.