-
Notifications
You must be signed in to change notification settings - Fork 6
/
retry_options.py
114 lines (89 loc) · 3.72 KB
/
retry_options.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import time
from delfick_project.norms import dictobj, sb
from photons_app import helpers as hp
def Gaps(*, gap_between_results, gap_between_ack_and_res, timeouts):
default_timeouts = timeouts
default_gap_between_results = gap_between_results
default_gap_between_ack_and_res = gap_between_ack_and_res
class tuple_spec(sb.Spec):
def setup(self, *specs):
self.spec = sb.tuple_spec(*specs)
def normalise_filled(self, meta, val):
if isinstance(val, list):
val = tuple(val)
return self.spec.normalise(meta, val)
class timeouts_default_spec(sb.Spec):
def normalise_empty(self, meta):
return default_timeouts
def normalise_filled(self, meta, val):
return sb.listof(tuple_spec(sb.float_spec(), sb.float_spec())).normalise(meta, val)
class ResultGaps(dictobj.Spec):
gap_between_results = dictobj.Field(
sb.float_spec,
default=default_gap_between_results,
help="""
When a packet has an unbound number of results we uses this number to
determine when we have enough results. Essentially the answer is yes if
it's been this long since the last result
""",
)
gap_between_ack_and_res = dictobj.Field(
sb.float_spec,
default=default_gap_between_ack_and_res,
help="""
When a packet has a received an acknowledgment but not a result, this
number is used to determine if we should wait for the result or send
the request again.
i.e. only send a retry if it's been longer than this time since the
acknowledgement
""",
)
timeouts = dictobj.Field(
timeouts_default_spec,
help="""
A list of (step, end) tuples that is used to determine the retry backoff.
Essentially, starting with the first step, increase by step until you
reach end and then use the next tuple to determine backoff from there.
So ``[(0.1, 0.1), (0.2, 0.5), (0.5, 3)]`` would go
``0.1, 0.3, 0.5, 1, 1.5, 2, 2.5, 3, 3, 3, ...``
""",
)
@property
def finish_multi_gap(self):
"""
When a packet has an unbound number of results or acks, this number is
used to schedule the next check to see if we should finish this result
This defaults to being ``gap_between_results + 0.05``
"""
return self.gap_between_results + 0.05
def retry_ticker(self, name=None):
return RetryTicker(timeouts=self.timeouts, name=name)
return ResultGaps.FieldSpec()
class RetryTicker:
def __init__(self, *, timeouts, name=None):
self.name = name
self.timeouts = timeouts
self.timeout = None
self.timeout_item = None
async def tick(self, final_future, timeout, min_wait=0.1):
timeouts = list(self.timeouts)
step, end = timeouts.pop(0)
ticker = hp.ATicker(
every=step,
final_future=final_future,
max_time=timeout,
min_wait=min_wait,
name=f"RetryTicker({self.name})::tick[ticker]",
)
start = time.time()
final_time = time.time() + timeout
async with ticker as ticks:
async for _, nxt in ticks:
now = time.time()
if end and now - start > end:
if timeouts:
step, end = timeouts.pop(0)
ticker.change_after(step)
else:
end = None
yield round(final_time - now, 3), nxt