Skip to content

Commit

Permalink
Python CDK: fix retry attempts in case of user defined backoff time (#…
Browse files Browse the repository at this point in the history
…5707)

* Python CDK: fix retry attempts in case of user defined backoff time
  • Loading branch information
avida committed Aug 31, 2021
1 parent 90e909f commit eed2e10
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 9 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.13
Fix defect with user defined backoff time retry attempts, number of retries logic fixed

## 0.1.12
Add raise_on_http_errors, max_retries, retry_factor properties to be able to ignore http status errors and modify retry time in HTTP stream

Expand Down
34 changes: 29 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ def raise_on_http_errors(self) -> bool:
return True

@property
def max_retries(self) -> int:
def max_retries(self) -> Union[int, None]:
"""
Override if needed. Specifies maximum amount of retries for backoff policy.
Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
"""
return 5

Expand Down Expand Up @@ -265,9 +265,33 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi
"""
Creates backoff wrappers which are responsible for retry logic
"""
backoff_handler = default_backoff_handler(max_tries=self.max_retries, factor=self.retry_factor)
user_backoff_handler = user_defined_backoff_handler(max_tries=self.max_retries)(backoff_handler)
return user_backoff_handler(self._send)(request, request_kwargs)

"""
Backoff package has max_tries parameter that means total number of
tries before giving up, so if this number is 0 no calls expected to be done.
But for this class we call it max_REtries assuming there would be at
least one attempt and some retry attempts, to comply this logic we add
1 to expected retries attempts.
"""
max_tries = self.max_retries
"""
According to backoff max_tries docstring:
max_tries: The maximum number of attempts to make before giving
up ...The default value of None means there is no limit to
the number of tries.
This implies that if max_tries is excplicitly set to None there is no
limit to retry attempts, otherwise it is limited number of tries. But
this is not true for current version of backoff packages (1.8.0). Setting
max_tries to 0 or negative number would result in endless retry atempts.
Add this condition to avoid an endless loop if it hasnt been set
explicitly (i.e. max_retries is not None).
"""
if max_tries is not None:
max_tries = max(0, max_tries) + 1

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)

def read_records(
self,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.12",
version="0.1.13",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
53 changes: 50 additions & 3 deletions airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@


import json
from http import HTTPStatus
from typing import Any, Iterable, Mapping, Optional
from unittest.mock import ANY

Expand Down Expand Up @@ -66,12 +67,13 @@ def test_request_kwargs_used(mocker, requests_mock):
stream = StubBasicReadHttpStream()
request_kwargs = {"cert": None, "proxies": "google.com"}
mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs)
mocker.patch.object(stream._session, "send", wraps=stream._session.send)
send_mock = mocker.patch.object(stream._session, "send", wraps=stream._session.send)
requests_mock.register_uri("GET", stream.url_base)

list(stream.read_records(sync_mode=SyncMode.full_refresh))

stream._session.send.assert_any_call(ANY, **request_kwargs)
assert send_mock.call_count == 1


def test_stub_basic_read_http_stream_read_records(mocker):
Expand Down Expand Up @@ -149,14 +151,58 @@ def test_stub_custom_backoff_http_stream(mocker):
req = requests.Response()
req.status_code = 429

mocker.patch.object(requests.Session, "send", return_value=req)
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)

with pytest.raises(UserDefinedBackoffException):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == stream.max_retries + 1

# TODO(davin): Figure out how to assert calls.


@pytest.mark.parametrize("retries", [-20, -1, 0, 1, 2, 10])
def test_stub_custom_backoff_http_stream_retries(mocker, retries):
mocker.patch("time.sleep", lambda x: None)

class StubCustomBackoffHttpStreamRetries(StubCustomBackoffHttpStream):
@property
def max_retries(self):
return retries

stream = StubCustomBackoffHttpStreamRetries()
req = requests.Response()
req.status_code = HTTPStatus.TOO_MANY_REQUESTS
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)

with pytest.raises(UserDefinedBackoffException):
list(stream.read_records(SyncMode.full_refresh))
if retries <= 0:
assert send_mock.call_count == 1
else:
assert send_mock.call_count == stream.max_retries + 1


def test_stub_custom_backoff_http_stream_endless_retries(mocker):
mocker.patch("time.sleep", lambda x: None)

class StubCustomBackoffHttpStreamRetries(StubCustomBackoffHttpStream):
@property
def max_retries(self):
return None

infinite_number = 20

stream = StubCustomBackoffHttpStreamRetries()
req = requests.Response()
req.status_code = HTTPStatus.TOO_MANY_REQUESTS
send_mock = mocker.patch.object(requests.Session, "send", side_effect=[req] * infinite_number)

# Expecting mock object to raise a RuntimeError when the end of side_effect list parameter reached.
with pytest.raises(RuntimeError):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == infinite_number + 1


@pytest.mark.parametrize("http_code", [400, 401, 403])
def test_4xx_error_codes_http_stream(mocker, http_code):
stream = StubCustomBackoffHttpStream()
Expand Down Expand Up @@ -190,9 +236,10 @@ def test_raise_on_http_errors_off_5xx(mocker, status_code):
req = requests.Response()
req.status_code = status_code

mocker.patch.object(requests.Session, "send", return_value=req)
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)
with pytest.raises(DefaultBackoffException):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == stream.max_retries + 1


@pytest.mark.parametrize("status_code", [400, 401, 402, 403, 416])
Expand Down

0 comments on commit eed2e10

Please sign in to comment.