Skip to content

Commit

Permalink
SAMZA-174: General-purpose implementation of a retry loop. Reviewed b…
Browse files Browse the repository at this point in the history
…y Chris Riccomini.
  • Loading branch information
Martin Kleppmann committed Mar 20, 2014
1 parent df6e11a commit f2fcb26
Show file tree
Hide file tree
Showing 10 changed files with 482 additions and 314 deletions.
Expand Up @@ -21,6 +21,19 @@

package org.apache.samza.util

import java.nio.channels.ClosedByInterruptException
import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop

/**
* Encapsulates the pattern of retrying an operation until it succeeds.
* Before every retry there is a delay, which starts short and gets exponentially
* longer on each retry, up to a configurable maximum. There is no limit to the
* number of retries.
*
* @param backOffMultiplier The factor by which the delay increases on each retry.
* @param initialDelayMs Time in milliseconds to wait after the first attempt failed.
* @param maximumDelayMs Cap up to which we will increase the delay.
*/
class ExponentialSleepStrategy(
backOffMultiplier: Double = 2.0,
initialDelayMs: Long = 100,
Expand All @@ -30,16 +43,117 @@ class ExponentialSleepStrategy(
require(initialDelayMs > 0, "initialDelayMs must be positive")
require(maximumDelayMs >= initialDelayMs, "maximumDelayMs must be >= initialDelayMs")

var previousDelay = 0L

def sleep() = {
val nextDelay = getNextDelay(previousDelay)
Thread.sleep(nextDelay)
previousDelay = nextDelay
}

/**
* Given the delay before the last retry, calculate what the delay before the
* next retry should be.
*/
def getNextDelay(previousDelay: Long): Long = {
val nextDelay = (previousDelay * backOffMultiplier).asInstanceOf[Long]
math.min(math.max(initialDelayMs, nextDelay), maximumDelayMs)
}

/** Can be overridden by subclasses to customize looping behavior. */
def startLoop: RetryLoop = new RetryLoopState

/**
* Starts a retryable operation with the delay properties that were configured
* when the object was created. Every call to run is independent, so the same
* ExponentialSleepStrategy object can be used for several different retry loops.
*
* loopOperation is called on every attempt, and given as parameter a RetryLoop
* object. By default it is assumed that the operation failed. If the operation
* succeeded, you must call <code>done</code> on the RetryLoop object to indicate
* success. This method returns the return value of the successful loopOperation.
*
* If an exception is thrown during the execution of loopOperation, the onException
* handler is called. You can choose to re-throw the exception (so that it aborts
* the run loop and bubbles up), or ignore it (the operation will be retried),
* or call <code>done</code> (give up, don't retry).
*
* @param loopOperation The operation that should be attempted and may fail.
* @param onException Handler function that determines what to do with an exception.
* @return If loopOperation succeeded, an option containing the return value of
* the last invocation. If done was called in the exception hander, None.
*/
def run[A](loopOperation: RetryLoop => A, onException: (Exception, RetryLoop) => Unit): Option[A] = {
val loop = startLoop
while (!loop.isDone && !Thread.currentThread.isInterrupted) {
try {
val result = loopOperation(loop)
if (loop.isDone) return Some(result)
} catch {
case e: InterruptedException => throw e
case e: ClosedByInterruptException => throw e
case e: Exception => onException(e, loop)
}
if (!loop.isDone && !Thread.currentThread.isInterrupted) loop.sleep
}
None
}

private[util] class RetryLoopState extends RetryLoop {
var previousDelay = 0L
var isDone = false
var sleepCount = 0

def sleep {
sleepCount += 1
val nextDelay = getNextDelay(previousDelay)
previousDelay = nextDelay
Thread.sleep(nextDelay)
}

def reset {
previousDelay = 0
isDone = false
}

def done {
isDone = true
}
}
}

object ExponentialSleepStrategy {
/**
* State of the retry loop, passed to every invocation of the loopOperation
* or the exception handler.
*/
trait RetryLoop {
/** Let the current thread sleep for the backoff time (called by run method). */
def sleep

/** Tell the retry loop to revert to initialDelayMs for the next retry. */
def reset

/** Tell the retry loop to stop trying (success or giving up). */
def done

/** Check whether <code>done</code> was called (used by the run method). */
def isDone: Boolean

/** Returns the number of times that the retry loop has called <code>sleep</code>. */
def sleepCount: Int
}

/** For tests using ExponentialSleepStrategy.Mock */
class CallLimitReached extends Exception

/**
* For writing tests of retryable code. Doesn't actually sleep, so that tests
* are quick to run.
*
* @param maxCalls The maximum number of retries to allow before throwing CallLimitReached.
*/
class Mock(maxCalls: Int) extends ExponentialSleepStrategy {
override def startLoop = new MockRetryLoop

class MockRetryLoop extends RetryLoop {
var isDone = false
var sleepCount = 0
def sleep { sleepCount += 1; if (sleepCount > maxCalls) throw new CallLimitReached }
def reset { isDone = false }
def done { isDone = true }
}
}
}
Expand Up @@ -23,34 +23,143 @@ package org.apache.samza.util

import org.junit.Assert._
import org.junit.Test
import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop

class TestExponentialSleepStrategy {

@Test def testGetNextDelayReturnsIncrementalDelay() = {
val st = new ExponentialSleepStrategy
var nextDelay = st.getNextDelay(0L)
assertEquals(nextDelay, 100L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 200L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 400L)
}

@Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached() = {
val st = new ExponentialSleepStrategy
var nextDelay = st.getNextDelay(6400L)
assertEquals(nextDelay, 10000L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 10000L)
}

@Test def testSleepStrategyIsConfigurable() = {
val st = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10)
var nextDelay = st.getNextDelay(0L)
assertEquals(nextDelay, 10L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 30L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 90L)
@Test def testGetNextDelayReturnsIncrementalDelay {
val strategy = new ExponentialSleepStrategy
assertEquals(100, strategy.getNextDelay(0))
assertEquals(200, strategy.getNextDelay(100))
assertEquals(400, strategy.getNextDelay(200))
assertEquals(800, strategy.getNextDelay(400))
}

@Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached {
val strategy = new ExponentialSleepStrategy
assertEquals(10000, strategy.getNextDelay(6400))
assertEquals(10000, strategy.getNextDelay(10000))
}

@Test def testSleepStrategyIsConfigurable {
val strategy = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10)
assertEquals(10, strategy.getNextDelay(0))
assertEquals(30, strategy.getNextDelay(10))
assertEquals(90, strategy.getNextDelay(30))
assertEquals(270, strategy.getNextDelay(90))
}

@Test def testResetToInitialDelay {
val strategy = new ExponentialSleepStrategy
val loop = strategy.startLoop.asInstanceOf[ExponentialSleepStrategy#RetryLoopState]
loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
assertEquals(100, loop.previousDelay)
loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
assertEquals(400, loop.previousDelay)
loop.reset
loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
assertEquals(100, loop.previousDelay)
}

@Test def testRetryWithoutException {
val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
var iterations = 0
var loopObject: RetryLoop = null
val result = strategy.run(
loop => {
loopObject = loop
iterations += 1
if (iterations == 3) loop.done
iterations
},
(exception, loop) => throw exception
)
assertEquals(Some(3), result)
assertEquals(3, iterations)
assertEquals(2, loopObject.sleepCount)
}

@Test def testRetryWithException {
val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
var iterations = 0
var loopObject: RetryLoop = null
strategy.run(
loop => { throw new IllegalArgumentException("boom") },
(exception, loop) => {
assertEquals("boom", exception.getMessage)
loopObject = loop
iterations += 1
if (iterations == 3) loop.done
}
)
assertEquals(3, iterations)
assertEquals(2, loopObject.sleepCount)
}

@Test def testReThrowingException {
val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
var iterations = 0
var loopObject: RetryLoop = null
try {
strategy.run(
loop => {
loopObject = loop
iterations += 1
throw new IllegalArgumentException("boom")
},
(exception, loop) => throw exception
)
fail("expected exception to be thrown")
} catch {
case e: IllegalArgumentException => assertEquals("boom", e.getMessage)
case e: Throwable => throw e
}
assertEquals(1, iterations)
assertEquals(0, loopObject.sleepCount)
}

def interruptedThread(operation: => Unit) = {
var exception: Option[Throwable] = None
val interruptee = new Thread(new Runnable {
def run {
try { operation } catch { case e: Throwable => exception = Some(e) }
}
})
interruptee.start
Thread.sleep(10) // give the thread a chance to make some progress before we interrupt it
interruptee.interrupt
interruptee.join
exception
}

@Test def testThreadInterruptInRetryLoop {
val strategy = new ExponentialSleepStrategy
var iterations = 0
var loopObject: RetryLoop = null
val exception = interruptedThread {
strategy.run(
loop => { iterations += 1; loopObject = loop },
(exception, loop) => throw exception
)
}
assertEquals(1, iterations)
assertEquals(1, loopObject.sleepCount)
assertEquals(classOf[InterruptedException], exception.get.getClass)
}

@Test def testThreadInterruptInOperationSleep {
val strategy = new ExponentialSleepStrategy
var iterations = 0
var loopObject: RetryLoop = null
val exception = interruptedThread {
strategy.run(
loop => { iterations += 1; loopObject = loop; Thread.sleep(1000) },
(exception, loop) => throw exception
)
}
assertEquals(1, iterations)
assertEquals(0, loopObject.sleepCount)
assertEquals(classOf[InterruptedException], exception.get.getClass)
}
}

0 comments on commit f2fcb26

Please sign in to comment.