Skip to content

Commit

Permalink
Merge pull request #2691 from rossabaker/issue-2677
Browse files Browse the repository at this point in the history
Dispose of current connection before retrying
  • Loading branch information
rossabaker committed Jul 6, 2019
2 parents 81beb71 + 45a1a1e commit 7810d35
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
32 changes: 13 additions & 19 deletions client/src/main/scala/org/http4s/client/middleware/Retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.http4s
package client
package middleware

import cats.effect.{Resource, Sync, Timer}
import cats.effect.{Concurrent, Resource, Timer}
import cats.implicits._
import java.time.Instant
import java.time.temporal.ChronoUnit
Expand All @@ -20,33 +20,34 @@ object Retry {
def apply[F[_]](
policy: RetryPolicy[F],
redactHeaderWhen: CaseInsensitiveString => Boolean = Headers.SensitiveHeaders.contains)(
client: Client[F])(implicit F: Sync[F], T: Timer[F]): Client[F] = {
client: Client[F])(implicit F: Concurrent[F], T: Timer[F]): Client[F] = {
def prepareLoop(req: Request[F], attempts: Int): Resource[F, Response[F]] =
client.run(req).attempt.flatMap {
case right @ Right(response) =>
policy(req, right, attempts) match {
Resource.suspend(F.continual(client.run(req).allocated) {
case Right((response, dispose)) =>
policy(req, Right(response), attempts) match {
case Some(duration) =>
logger.info(
s"Request ${showRequest(req, redactHeaderWhen)} has failed on attempt #${attempts} with reason ${response.status}. Retrying after ${duration}.")
nextAttempt(req, attempts, duration, response.headers.get(`Retry-After`))
dispose >> F.pure(
nextAttempt(req, attempts, duration, response.headers.get(`Retry-After`)))
case None =>
Resource.pure(response)
F.pure(Resource.make(F.pure(response))(_ => dispose))
}

case left @ Left(e) =>
policy(req, left, attempts) match {
case Left(e) =>
policy(req, Left(e), attempts) match {
case Some(duration) =>
// info instead of error(e), because e is not discarded
logger.info(e)(
s"Request threw an exception on attempt #$attempts. Retrying after $duration")
nextAttempt(req, attempts, duration, None)
F.pure(nextAttempt(req, attempts, duration, None))
case None =>
logger.info(e)(
s"Request ${showRequest(req, redactHeaderWhen)} threw an exception on attempt #$attempts. Giving up."
)
Resource.liftF(F.raiseError(e))
F.pure(Resource.liftF(F.raiseError(e)))
}
}
})

def showRequest(request: Request[F], redactWhen: CaseInsensitiveString => Boolean): String = {
val headers = request.headers.redactSensitive(redactWhen).toList.mkString(",")
Expand Down Expand Up @@ -75,13 +76,6 @@ object Retry {

Client(prepareLoop(_, 1))
}

@deprecated("The `redactHeaderWhen` parameter is now available on `apply`.", "0.19.1")
def retryWithRedactedHeaders[F[_]](
policy: RetryPolicy[F],
redactHeaderWhen: CaseInsensitiveString => Boolean)(
client: Client[F])(implicit F: Sync[F], T: Timer[F]): Client[F] =
apply(policy, redactHeaderWhen)(client)
}

object RetryPolicy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package client
package middleware

import cats.effect.{IO, Resource}
import cats.effect.concurrent.Ref
import cats.effect.concurrent.{Ref, Semaphore}
import cats.implicits._
import fs2.Stream
import org.http4s.dsl.io._
Expand Down Expand Up @@ -110,5 +110,21 @@ class RetrySpec extends Http4sSpec with Tables {
val failClient = Client[IO](_ => Resource.liftF(IO.raiseError(WaitQueueTimeoutException)))
countRetries(failClient, GET, InternalServerError, EmptyBody) must_== 1
}

"not exhaust the connection pool on retry" in {
Semaphore[IO](2).flatMap { semaphore =>
val client = Retry[IO](
RetryPolicy(
(att =>
if (att < 3) Some(Duration.Zero)
else None),
RetryPolicy.defaultRetriable[IO]))(Client[IO](_ =>
Resource.make(semaphore.tryAcquire.flatMap {
case true => Response[IO](Status.InternalServerError).pure[IO]
case false => IO.raiseError(new IllegalStateException("Exhausted all connections"))
})(_ => semaphore.release)))
client.status(Request[IO]())
} must returnValue(Status.InternalServerError)
}
}
}

0 comments on commit 7810d35

Please sign in to comment.