Skip to content

Commit 1cd8fea

Browse files
committed
Added retry interceptor. Added some tests for it.
1 parent c4f0905 commit 1cd8fea

File tree

7 files changed

+546
-16
lines changed

7 files changed

+546
-16
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
*.pyc
22
__pycache__
3+
.idea/
4+
venv/

setup.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
from setuptools import setup, find_packages
22

3-
packages = find_packages('.',
4-
include=['yandexcloud*', 'yandex*'])
3+
packages = find_packages('.', include=['yandexcloud*', 'yandex*'])
54

65
setup(name='yandexcloud',
7-
version='0.4',
8-
description='The Yandex.Cloud official SDK',
9-
url='https://github.com/yandex-cloud/python-sdk',
10-
author='Yandex LLC',
11-
author_email='FIXME',
12-
license='MIT',
13-
install_requires=[
14-
'grpcio',
15-
'googleapis-common-protos',
16-
'six',
17-
],
18-
packages=packages,
19-
zip_safe=False)
6+
version='0.5',
7+
description='The Yandex.Cloud official SDK',
8+
url='https://github.com/yandex-cloud/python-sdk',
9+
author='Yandex LLC',
10+
author_email='FIXME',
11+
license='MIT',
12+
install_requires=[
13+
'grpcio',
14+
'googleapis-common-protos',
15+
'six',
16+
],
17+
tests_require=['pytest'],
18+
packages=packages,
19+
zip_safe=False)

tests/conftest.py

Whitespace-only changes.

tests/test_retry_interceptor.py

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
import pytest
2+
import grpc
3+
import time
4+
import uuid
5+
6+
from threading import Event
7+
from concurrent import futures
8+
9+
import yandex.cloud.compute.v1.zone_service_pb2_grpc as zone_service_pb2_grpc
10+
import yandex.cloud.compute.v1.zone_service_pb2 as zone_service_pb2
11+
import yandex.cloud.compute.v1.zone_pb2 as zone_pb2
12+
13+
from yandexcloud._retry_interceptor import RetryInterceptor
14+
from yandexcloud._backoff import default_backoff, backoff_linear_with_jitter
15+
16+
_DEFAULT_SERVICE_PORT = "50051"
17+
_SERVICE_ADDR = "localhost:" + _DEFAULT_SERVICE_PORT
18+
_DEFAULT_ZONE = zone_pb2.Zone()
19+
20+
21+
class ZoneServiceMock(object):
22+
def __init__(self, handler):
23+
self.__handler = handler
24+
25+
def Get(self, request, context):
26+
return self.__handler(context)
27+
28+
def List(self, request, context):
29+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
30+
context.set_details('Method not implemented!')
31+
return zone_service_pb2.ListZonesResponse()
32+
33+
34+
def _grpc_server(handler):
35+
service = ZoneServiceMock(handler)
36+
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
37+
server.add_insecure_port("[::]:" + _DEFAULT_SERVICE_PORT)
38+
zone_service_pb2_grpc.add_ZoneServiceServicer_to_server(service, server)
39+
server.start()
40+
assert _is_grpc_endpoint_ready(60)
41+
return server
42+
43+
44+
def _is_grpc_endpoint_ready(timeout):
45+
def check_endpoint_ready():
46+
channel = grpc.insecure_channel(_SERVICE_ADDR)
47+
client = zone_service_pb2_grpc.ZoneServiceStub(channel)
48+
49+
try:
50+
client.List(zone_service_pb2.ListZonesRequest(), timeout=1)
51+
except grpc.RpcError as e:
52+
return e.code() not in [grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED]
53+
54+
return True
55+
56+
deadline = time.monotonic() + timeout
57+
58+
while time.monotonic() <= deadline:
59+
if check_endpoint_ready():
60+
return True
61+
62+
return False
63+
64+
65+
class _FailFirstAttempts:
66+
def __init__(self, fail_attempts):
67+
self.__fail_attempts = fail_attempts
68+
69+
def handler(self, context):
70+
if self.__fail_attempts > 0:
71+
self.__fail_attempts -= 1
72+
context.set_code(grpc.StatusCode.UNAVAILABLE)
73+
74+
return _DEFAULT_ZONE
75+
76+
def reset(self, fail_attempts):
77+
self.__fail_attempts = fail_attempts
78+
79+
80+
def test_five_retries():
81+
service = _FailFirstAttempts(5)
82+
server = _grpc_server(service.handler)
83+
84+
with grpc.insecure_channel(_SERVICE_ADDR) as channel:
85+
for max_retry_count in range(4):
86+
interceptor = RetryInterceptor(max_retry_count=max_retry_count)
87+
ch = grpc.intercept_channel(channel, interceptor)
88+
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
89+
90+
with pytest.raises(grpc.RpcError) as e:
91+
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
92+
93+
assert e.value.code() == grpc.StatusCode.UNAVAILABLE
94+
service.reset(5)
95+
96+
interceptor = RetryInterceptor(max_retry_count=5)
97+
ch = grpc.intercept_channel(channel, interceptor)
98+
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
99+
res = client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
100+
101+
assert res == _DEFAULT_ZONE
102+
103+
server.stop(0)
104+
105+
106+
class _RetriableCodes:
107+
def __init__(self, retriable_codes):
108+
self.__retriable_codes = retriable_codes
109+
self.__get_count = 0
110+
111+
def handler(self, context):
112+
if self.__get_count < len(self.__retriable_codes):
113+
context.set_code(self.__retriable_codes[self.__get_count])
114+
115+
self.__get_count += 1
116+
return _DEFAULT_ZONE
117+
118+
def reset_state(self):
119+
self.__get_count = 0
120+
121+
122+
def test_retriable_codes():
123+
retriable_codes = [grpc.StatusCode.RESOURCE_EXHAUSTED,
124+
grpc.StatusCode.UNAVAILABLE,
125+
grpc.StatusCode.DATA_LOSS]
126+
127+
service = _RetriableCodes(retriable_codes)
128+
server = _grpc_server(service.handler)
129+
130+
with grpc.insecure_channel(_SERVICE_ADDR) as channel:
131+
for retry_qty in range(len(retriable_codes)):
132+
interceptor = RetryInterceptor(max_retry_count=retry_qty, retriable_codes=retriable_codes)
133+
ch = grpc.intercept_channel(channel, interceptor)
134+
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
135+
136+
with pytest.raises(grpc.RpcError) as e:
137+
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
138+
139+
assert e.value.code() == retriable_codes[retry_qty]
140+
service.reset_state()
141+
142+
interceptor = RetryInterceptor(max_retry_count=len(retriable_codes), retriable_codes=retriable_codes)
143+
ch = grpc.intercept_channel(channel, interceptor)
144+
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
145+
assert client.Get(zone_service_pb2.GetZoneRequest(zone_id="id")) == _DEFAULT_ZONE
146+
147+
server.stop(0)
148+
149+
150+
class _AlwaysUnavailable:
151+
def __init__(self):
152+
self.__get_count = 0
153+
self.__t_checker = None
154+
self.__error = False
155+
156+
@property
157+
def error(self):
158+
return self.__error
159+
160+
def handler(self, context):
161+
if self.__t_checker and not self.__t_checker():
162+
self.__error = True
163+
164+
self.__get_count += 1
165+
166+
if self.__get_count == 100:
167+
pass
168+
169+
context.set_code(grpc.StatusCode.UNAVAILABLE)
170+
return _DEFAULT_ZONE
171+
172+
173+
@pytest.mark.parametrize("backoff", [None, default_backoff(), backoff_linear_with_jitter(0.05, 0.1)])
174+
def test_infinite_retries_deadline_and_backoff(backoff):
175+
service = _AlwaysUnavailable()
176+
server = _grpc_server(service.handler)
177+
178+
with grpc.insecure_channel(_SERVICE_ADDR) as channel:
179+
interceptor = RetryInterceptor(max_retry_count=-1, retriable_codes=[grpc.StatusCode.UNAVAILABLE],
180+
add_retry_count_to_header=True, back_off_func=backoff)
181+
182+
ch = grpc.intercept_channel(channel, interceptor)
183+
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
184+
185+
with pytest.raises(grpc.RpcError) as e:
186+
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"), timeout=5)
187+
188+
assert e.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED
189+
190+
server.stop(0)
191+
192+
193+
class _NeverReturnsInTime:
194+
def __init__(self, shutdown):
195+
self.__shutdown = shutdown
196+
197+
def handler(self, context):
198+
time_remaining = context.time_remaining()
199+
200+
# using hack here, since deadline is never None. 31557600 ~= one year in seconds
201+
if time_remaining < 31557600.:
202+
self.__shutdown.wait()
203+
204+
context.set_code(grpc.StatusCode.UNAVAILABLE)
205+
return _DEFAULT_ZONE
206+
207+
208+
def test_per_call_timeout():
209+
shutdown = Event()
210+
service = _NeverReturnsInTime(shutdown)
211+
server = _grpc_server(service.handler)
212+
213+
with grpc.insecure_channel(_SERVICE_ADDR) as channel:
214+
interceptor = RetryInterceptor(max_retry_count=10, retriable_codes=[grpc.StatusCode.UNAVAILABLE],
215+
per_call_timeout=1, add_retry_count_to_header=True)
216+
217+
ch = grpc.intercept_channel(channel, interceptor)
218+
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
219+
220+
with pytest.raises(grpc.RpcError) as e:
221+
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
222+
223+
assert e.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED
224+
225+
shutdown.set()
226+
server.stop(1)
227+
228+
229+
class _HeaderTokenAndRetryCount:
230+
def __init__(self):
231+
self.__query_count = 0
232+
self.__token = None
233+
self.__token_error = False
234+
self.__header_error = False
235+
236+
@property
237+
def error(self):
238+
return self.__header_error or self.__token_error
239+
240+
def handler(self, context):
241+
metadata = context.invocation_metadata()
242+
243+
if metadata is not None:
244+
token = [v[1] for v in metadata if v[0] == "idempotency-key"]
245+
246+
if len(token) != 1:
247+
self.__token_error = True
248+
else:
249+
# store token on first call, on consequent calls, check that token didn't change
250+
if self.__query_count == 0:
251+
self.__token = token[0]
252+
else:
253+
if self.__token != token[0]:
254+
self.__token_error = True
255+
256+
if self.__query_count > 0:
257+
retry_meta = [v[1] for v in metadata if v[0] == "x-retry-attempt"]
258+
259+
if len(retry_meta) != 1 or retry_meta[0] != str(self.__query_count):
260+
self.__header_error = True
261+
else:
262+
self.__token_error = True
263+
self.__header_error = True
264+
265+
self.__query_count += 1
266+
267+
context.set_code(grpc.StatusCode.UNAVAILABLE)
268+
return _DEFAULT_ZONE
269+
270+
271+
def test_header_token_and_retry_count():
272+
service = _HeaderTokenAndRetryCount()
273+
server = _grpc_server(service.handler)
274+
275+
with grpc.insecure_channel(_SERVICE_ADDR) as channel:
276+
interceptor = RetryInterceptor(max_retry_count=100, retriable_codes=[grpc.StatusCode.UNAVAILABLE],
277+
add_retry_count_to_header=True)
278+
ch = grpc.intercept_channel(channel, interceptor)
279+
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
280+
281+
with pytest.raises(grpc.RpcError) as e:
282+
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
283+
284+
assert e.value.code() == grpc.StatusCode.UNAVAILABLE
285+
286+
assert not service.error
287+
server.stop(0)
288+
289+
290+
class _TokenUnchanged:
291+
def __init__(self, token):
292+
self.__token = token
293+
self.__token_changed = False
294+
295+
@property
296+
def token_changed(self):
297+
return self.__token_changed
298+
299+
def handler(self, context):
300+
metadata = context.invocation_metadata()
301+
302+
if metadata is not None:
303+
token = [v[1] for v in metadata if v[0] == "idempotency-key"]
304+
305+
if len(token) != 1 or token[0] != self.__token:
306+
self.__token_changed = True
307+
else:
308+
self.__token_changed = True
309+
310+
context.set_code(grpc.StatusCode.UNAVAILABLE)
311+
return _DEFAULT_ZONE
312+
313+
314+
def test_idempotency_token_not_changed():
315+
token = str(uuid.uuid4())
316+
service = _TokenUnchanged(token)
317+
server = _grpc_server(service.handler)
318+
319+
with grpc.insecure_channel(_SERVICE_ADDR) as channel:
320+
interceptor = RetryInterceptor(max_retry_count=100, retriable_codes=[grpc.StatusCode.UNAVAILABLE],
321+
add_retry_count_to_header=True)
322+
ch = grpc.intercept_channel(channel, interceptor)
323+
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
324+
325+
with pytest.raises(grpc.RpcError) as e:
326+
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"), metadata=[("idempotency-key", token)])
327+
328+
assert e.value.code() == grpc.StatusCode.UNAVAILABLE
329+
330+
assert not service.token_changed
331+
server.stop(0)

yandexcloud/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Main package for Yandex.Cloud SDK."""
22

33
from yandexcloud._sdk import SDK
4+
from yandexcloud._retry_interceptor import RetryInterceptor
5+
from yandexcloud._backoff import backoff_linear_with_jitter, backoff_exponential_with_jitter, default_backoff
46

5-
__version__ = '0.0.1'
7+
__version__ = '0.0.2'

0 commit comments

Comments
 (0)