Permalink
Fetching contributors…
Cannot retrieve contributors at this time
70 lines (53 sloc) 2.14 KB
# -*- coding: utf-8 -*-
from ..backoff import Backoff
from ..constants import INT_ERROR, POS_ERROR
class ConstantBackoff(Backoff):
"""
ConstantBackOff is a backoff policy that always returns the same
backoff delay.
The constant backoff policy can be configured with a custom
retry interval and maximum number of retries.
This is in contrast to an exponential backoff policy,
which returns a delay that grows longer as you call `next()`.
ConstantBackoff is expected to run in a single-thread context.
Arguments:
interval (int|float): wait interval before retry in seconds.
Use `0` for no wait. Defaults to `0.1` = `100` milliseconds.
retries (int): maximum number of retries attempts.
Use `0` for no limit. Defaults to `10`.
Raises:
AssertionError: in case of invalid params.
Usge::
@riprova.retry(backoff=riprova.ConstantBackoff(retries=5))
def task(x):
return x * x
"""
def __init__(self, interval=.1, retries=10):
assert isinstance(retries, int), INT_ERROR.format('retries')
assert isinstance(interval, (int, float)), INT_ERROR.format('interval')
assert retries >= 0, POS_ERROR.format('retries')
assert interval >= 0, POS_ERROR.format('interval')
self.retries = retries
self.pending_retries = retries
self.interval = interval
def reset(self):
"""
Resets the current backoff state data.
"""
self.pending_retries = self.retries
def next(self):
"""
Returns the number of seconds to wait before the next try,
otherwise returns `Backoff.STOP`, which indicates the max number
of retry operations were reached.
Returns:
float: time to wait in seconds before the next try.
"""
# Verify we do not exceeded the max retries
if self.retries > 0 and self.pending_retries == 0:
return Backoff.STOP
# Decrement pending retries attempts
if self.retries > 0:
self.pending_retries -= 1
# Return pending interval
return self.interval