Skip to content

Commit

Permalink
Generalize RetryPolicy function to StateT
Browse files Browse the repository at this point in the history
Closes #6 - replaced Int => Option[FiniteDuration] with StateT[Option, S, FiniteDuration]
Fixes #12 - StateT can memoize result of previous steps

Source
* Replace Int => Option[FiniteDuration] with (S, StateT[Option, S FiniteDuration]), where S is an abstract type
* Remove boundError
* Seal RetryPolicy and add constructors on companion object
* RetryPolicy only has a Semigroup instance now due to generalizing the transition function

Test
* Add test to check iterateDelay memoizes
  • Loading branch information
adelbertc committed Mar 12, 2015
1 parent 1fa2bfd commit 7ffc29e
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 132 deletions.
144 changes: 76 additions & 68 deletions core/src/main/scala/rebind/RetryPolicy.scala
Expand Up @@ -2,21 +2,26 @@ package rebind

import scala.concurrent.duration._

import scalaz.{ Apply, Disjunction, DisjunctionT, DLeft, DRight, Equal, Foldable, IList, Monad, Monoid, Zipper }
import scalaz.std.anyVal.intInstance
import scalaz.{ Apply, Disjunction, DisjunctionT, DLeft, DRight, Equal, Foldable, IList, Monad, Semigroup, StateT, Zipper }
import scalaz.std.option._
import scalaz.syntax.apply._

/** Retry policy.
*
* The function parameter represents the n-th retry. It should return a `Some` of a `FiniteDuration`
* (minimum time to wait before next retry) in the case where you want to retry again, or a
* `None` if you want to give up.
*/
final case class RetryPolicy(private[rebind] val run: Int => Option[FiniteDuration]) {
sealed abstract class RetryPolicy { outer =>
private[rebind] type S

private[rebind] def initialState: S

private[rebind] def transition: StateT[Option, S, FiniteDuration]

/** Wait for a maximum of the specified time before trying again */
def capDelay(limit: FiniteDuration): RetryPolicy =
RetryPolicy(n => run(n).map(_.min(limit)))
new RetryPolicy {
type S = outer.S

def initialState = outer.initialState

def transition: StateT[Option, S, FiniteDuration] = outer.transition.map(_.min(limit))
}

/** Combine this policy with another.
*
Expand All @@ -31,37 +36,24 @@ final case class RetryPolicy(private[rebind] val run: Int => Option[FiniteDurati
* }}}
*/
def &&(other: RetryPolicy): RetryPolicy =
RetryPolicy(n => Apply[Option].apply2(this.run(n), other.run(n))(_ max _))
new RetryPolicy {
type S = (outer.S, other.S)

def initialState = (outer.initialState, other.initialState)

def transition: StateT[Option, (outer.S, other.S), FiniteDuration] =
StateT {
case (outerState, otherState) =>
Apply[Option].apply2(outer.transition(outerState), other.transition(otherState)) {
case ((outerNext, outerVal), (otherNext, otherVal)) =>
((outerNext, otherNext), outerVal.max(otherVal))
}
}
}

/** Alias for `&&` */
def and(other: RetryPolicy): RetryPolicy = this && other

/** Retry with error-specific limits on total number of errors, or when the policy is exhausted.
*
* The limits indicated are compared to the total number of times the action has been retried,
* across *all* errors. For instance, if we have:
*
* {{{
* sealed abstract class UhOh
* final case object A extends UhOh
* final case object B extends UhOh
*
* somePolicy.boundError(someAction) {
* case A => 2.times
* case B => 1.time
* }
* }}}
*
* and the action first fails twice with `A` and then with `B`, the action does not retry on the `B`.
* This is due to the fact that the action in total has been retried twice, and 2 > 1 (`B` specified bound).
*
* Stack safe so long as `F[_]` is.
*/
def boundError[F[_] : Monad, E, A](action: DisjunctionT[F, E, A])(limits: E => Count): DisjunctionT[F, E, A] = {
val unwrapped = action.run
unfold(unwrapped, ())(Function.const(unwrapped), (e, _, n) => if (limits(e) > n) Some(()) else None)
}

/** Retry with error-specific limits, or when policy is exhausted.
*
* Limits are compared against the total number of times the error has occured so far,
Expand All @@ -70,7 +62,7 @@ final case class RetryPolicy(private[rebind] val run: Int => Option[FiniteDurati
* Stack safe so long as `F[_]` is.
*/
def recover[F[_] : Monad, E : Equal, A](action: DisjunctionT[F, E, A])(limits: E => Count): DisjunctionT[F, E, A] = {
def checkError(error: E, history: IList[(E, Int)], iteration: Int): Option[IList[(E, Int)]] =
def checkError(error: E, history: IList[(E, Int)]): Option[IList[(E, Int)]] =
limits(error) match {
case Count.Finite(0) => None
case _ =>
Expand Down Expand Up @@ -100,7 +92,7 @@ final case class RetryPolicy(private[rebind] val run: Int => Option[FiniteDurati
* Stack safe so long as `F[_]` is.
*/
def recoverAll[F[_] : Monad, E, A](action: DisjunctionT[F, E, A]): DisjunctionT[F, E, A] =
boundError(action)(Function.const(Count.Infinite))
retrying(action)(Function.const(action))

/** Retry with error-specific limits on consecutive errors, or when policy is exhausted.
*
Expand All @@ -111,7 +103,7 @@ final case class RetryPolicy(private[rebind] val run: Int => Option[FiniteDurati
* Stack safe so long as `F[_]` is.
*/
def recoverConsecutive[F[_] : Monad, E : Equal, A](action: DisjunctionT[F, E, A])(limits: E => Count): DisjunctionT[F, E, A] = {
def checkError(error: E, count: Option[(E, Int)], iteration: Int): Option[(Option[(E, Int)])] =
def checkError(error: E, count: Option[(E, Int)]): Option[(Option[(E, Int)])] =
count match {
// first iteration
case None =>
Expand Down Expand Up @@ -140,70 +132,86 @@ final case class RetryPolicy(private[rebind] val run: Int => Option[FiniteDurati
* Stack safe so long as `F[_]` is.
*/
def retrying[F[_] : Monad, E, A](action: DisjunctionT[F, E, A])(handler: E => DisjunctionT[F, E, A]): DisjunctionT[F, E, A] =
unfold(action.run, ())(e => handler(e).run, (_, _, _) => Option(()))
unfold(action.run, ())(e => handler(e).run, (_, _) => Option(()))

private def unfold[F[_] : Monad, E, A, S](currentAction: F[Disjunction[E, A]], firstState: S)(
private def unfold[F[_] : Monad, E, A, T](currentAction: F[Disjunction[E, A]], initialTest: T)(
next: E => F[Disjunction[E, A]],
p: (E, S, Int) => Option[S]): DisjunctionT[F, E, A] = {
def go(action: F[Disjunction[E, A]], n: Int, state: S): F[Disjunction[E, A]] =
test: (E, T) => Option[T]): DisjunctionT[F, E, A] = {
def go(action: F[Disjunction[E, A]], nextState: S, nextTest: T): F[Disjunction[E, A]] =
Monad[F].bind(action) { d =>
val pointed = Monad[F].point(d)

d match {
case DLeft(e) =>
Apply[Option].tuple2(run(n), p(e, state, n)).fold(pointed) {
case (delay, nextState) =>
Monad[F].point(DRight(Thread.sleep(delay.toMillis))) *> go(next(e), n + 1, nextState)
Apply[Option].tuple2(transition(nextState), test(e, nextTest)).fold(pointed) {
case ((anotherState, delay), anotherTest) =>
Monad[F].point(DRight(Thread.sleep(delay.toMillis))) *> go(next(e), anotherState, anotherTest)
}
case DRight(_) => pointed
}
}

DisjunctionT(go(currentAction, 0, firstState))
DisjunctionT(go(currentAction, initialState, initialTest))
}

}

object RetryPolicy extends RetryPolicyInstances with RetryPolicyFunctions
object RetryPolicy extends RetryPolicyInstances with RetryPolicyFunctions {
/** Create a retry policy with a state transition function.
*
* Iterates with `next` starting with `initial`. `next` should return a `Some` of
* a pair of `S` (the next state) and `FiniteDuration` (minimum time to wait before
* next retry) if you want to retry again, or a `None` if you want to give up.
*/
def apply[S](initial: S)(next: S => Option[(S, FiniteDuration)]): RetryPolicy =
stateT(initial)(StateT(next))

trait RetryPolicyInstances {
implicit val retryPolicyInstance: Monoid[RetryPolicy] =
new Monoid[RetryPolicy] {
def append(f1: RetryPolicy, f2: => RetryPolicy): RetryPolicy = f1 && f2
/** Create a retry policy with a state transition function.
*
* Iterates with `next` starting with `initial`. `next` should return a `Some` of
* a pair of `S` (the next state) and `FiniteDuration` (minimum time to wait before
* next retry) if you want to retry again, or a `None` if you want to give up.
*/
def stateT[T](initial: T)(next: StateT[Option, T, FiniteDuration]): RetryPolicy =
new RetryPolicy {
type S = T

def initialState = initial

def zero: RetryPolicy = RetryPolicy(Function.const(Option(Duration.Zero)))
def transition = next
}
}

trait RetryPolicyInstances {
implicit val retryPolicyInstance: Semigroup[RetryPolicy] =
Semigroup.instance(_ && _)
}

trait RetryPolicyFunctions {
/** Constantly retry, pausing a fixed amount in between */
def constantDelay(delay: FiniteDuration): RetryPolicy =
RetryPolicy(Function.const(Option(delay)))
RetryPolicy(())(Function.const(Option(((), delay))))

/** Exponential backoff, iterating indefinitely with a seed duration */
def exponentialBackoff(base: FiniteDuration): RetryPolicy =
RetryPolicy(n => Option(base * math.pow(2, n.toDouble).toLong))
RetryPolicy(1L)(n => Option((2 * n, base * n)))

/** Fibonacci backoff, iterating indefinitely with a seed duration */
def fibonacciBackoff(base: FiniteDuration): RetryPolicy = {
@annotation.tailrec
def fibonacci(n: Int, state: (FiniteDuration, FiniteDuration)): FiniteDuration =
n match {
case 0 => state._1
case _ => fibonacci(n - 1, (state._2, state._1 + state._2))
}

RetryPolicy(n => Option(fibonacci(n + 1, (Duration.Zero, base))))
}
def fibonacciBackoff(base: FiniteDuration): RetryPolicy =
RetryPolicy((base, base)) {
case (next, after) =>
val nextState = (after, next + after)
Option((nextState, next))
}

/** Constantly retry immediately */
def immediate: RetryPolicy = constantDelay(Duration.Zero)

/** Constantly retry, starting at the specified base and iterating */
def iterateDelay(base: FiniteDuration)(f: FiniteDuration => FiniteDuration): RetryPolicy =
RetryPolicy(n => Option(Function.chain(List.fill(n)(f))(base)))
RetryPolicy(base)(fd => Option((f(fd), fd)))

/** Immediately retry the specified number of times */
def limitRetries(i: Int): RetryPolicy =
RetryPolicy(n => if (n < i) Option(Duration.Zero) else None)
RetryPolicy(0)(n => if (n < i) Option((n + 1, Duration.Zero)) else None)
}
3 changes: 0 additions & 3 deletions core/src/main/scala/rebind/syntax/Kleisli.scala
Expand Up @@ -12,9 +12,6 @@ class KleisliOps[F[_], E, A](val action: DisjunctionT[F, E, A]) extends AnyVal {
private def lift(f: RetryPolicy => DisjunctionT[F, E, A]): Kleisli[DisjunctionT[F, E, ?], RetryPolicy, A] =
Kleisli[DisjunctionT[F, E, ?], RetryPolicy, A](f)

def boundError(limits: E => Count)(implicit F: Monad[F]): Kleisli[DisjunctionT[F, E, ?], RetryPolicy, A] =
lift(_.boundError(action)(limits))

def recover(limits: E => Count)(implicit E: Equal[E], F: Monad[F]): Kleisli[DisjunctionT[F, E, ?], RetryPolicy, A] =
lift(_.recover(action)(limits))

Expand Down

0 comments on commit 7ffc29e

Please sign in to comment.