From eed2e107fd13a6cbc732b5b8868b0bf0482fa600 Mon Sep 17 00:00:00 2001 From: Dmytro Date: Tue, 31 Aug 2021 09:55:45 +0300 Subject: [PATCH] Python CDK: fix retry attempts in case of user defined backoff time (#5707) * Python CDK: fix retry attempts in case of user defined backoff time --- airbyte-cdk/python/CHANGELOG.md | 3 ++ .../airbyte_cdk/sources/streams/http/http.py | 34 ++++++++++-- airbyte-cdk/python/setup.py | 2 +- .../sources/streams/http/test_http.py | 53 +++++++++++++++++-- 4 files changed, 83 insertions(+), 9 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index d296390c5fb38..e8a1e8e920880 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.13 +Fix defect with user defined backoff time retry attempts, number of retries logic fixed + ## 0.1.12 Add raise_on_http_errors, max_retries, retry_factor properties to be able to ignore http status errors and modify retry time in HTTP stream diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 2a0178875f655..a078266e1f472 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -71,9 +71,9 @@ def raise_on_http_errors(self) -> bool: return True @property - def max_retries(self) -> int: + def max_retries(self) -> Union[int, None]: """ - Override if needed. Specifies maximum amount of retries for backoff policy. + Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit. """ return 5 @@ -265,9 +265,33 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi """ Creates backoff wrappers which are responsible for retry logic """ - backoff_handler = default_backoff_handler(max_tries=self.max_retries, factor=self.retry_factor) - user_backoff_handler = user_defined_backoff_handler(max_tries=self.max_retries)(backoff_handler) - return user_backoff_handler(self._send)(request, request_kwargs) + + """ + Backoff package has max_tries parameter that means total number of + tries before giving up, so if this number is 0 no calls expected to be done. + But for this class we call it max_REtries assuming there would be at + least one attempt and some retry attempts, to comply this logic we add + 1 to expected retries attempts. + """ + max_tries = self.max_retries + """ + According to backoff max_tries docstring: + max_tries: The maximum number of attempts to make before giving + up ...The default value of None means there is no limit to + the number of tries. + This implies that if max_tries is excplicitly set to None there is no + limit to retry attempts, otherwise it is limited number of tries. But + this is not true for current version of backoff packages (1.8.0). Setting + max_tries to 0 or negative number would result in endless retry atempts. + Add this condition to avoid an endless loop if it hasnt been set + explicitly (i.e. max_retries is not None). + """ + if max_tries is not None: + max_tries = max(0, max_tries) + 1 + + user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send) + backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor) + return backoff_handler(user_backoff_handler)(request, request_kwargs) def read_records( self, diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 28cb7070ea7bb..38a119df0e502 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.12", + version="0.1.13", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index 57eb9875bf02b..84a53835243d3 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -24,6 +24,7 @@ import json +from http import HTTPStatus from typing import Any, Iterable, Mapping, Optional from unittest.mock import ANY @@ -66,12 +67,13 @@ def test_request_kwargs_used(mocker, requests_mock): stream = StubBasicReadHttpStream() request_kwargs = {"cert": None, "proxies": "google.com"} mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs) - mocker.patch.object(stream._session, "send", wraps=stream._session.send) + send_mock = mocker.patch.object(stream._session, "send", wraps=stream._session.send) requests_mock.register_uri("GET", stream.url_base) list(stream.read_records(sync_mode=SyncMode.full_refresh)) stream._session.send.assert_any_call(ANY, **request_kwargs) + assert send_mock.call_count == 1 def test_stub_basic_read_http_stream_read_records(mocker): @@ -149,14 +151,58 @@ def test_stub_custom_backoff_http_stream(mocker): req = requests.Response() req.status_code = 429 - mocker.patch.object(requests.Session, "send", return_value=req) + send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises(UserDefinedBackoffException): list(stream.read_records(SyncMode.full_refresh)) + assert send_mock.call_count == stream.max_retries + 1 # TODO(davin): Figure out how to assert calls. +@pytest.mark.parametrize("retries", [-20, -1, 0, 1, 2, 10]) +def test_stub_custom_backoff_http_stream_retries(mocker, retries): + mocker.patch("time.sleep", lambda x: None) + + class StubCustomBackoffHttpStreamRetries(StubCustomBackoffHttpStream): + @property + def max_retries(self): + return retries + + stream = StubCustomBackoffHttpStreamRetries() + req = requests.Response() + req.status_code = HTTPStatus.TOO_MANY_REQUESTS + send_mock = mocker.patch.object(requests.Session, "send", return_value=req) + + with pytest.raises(UserDefinedBackoffException): + list(stream.read_records(SyncMode.full_refresh)) + if retries <= 0: + assert send_mock.call_count == 1 + else: + assert send_mock.call_count == stream.max_retries + 1 + + +def test_stub_custom_backoff_http_stream_endless_retries(mocker): + mocker.patch("time.sleep", lambda x: None) + + class StubCustomBackoffHttpStreamRetries(StubCustomBackoffHttpStream): + @property + def max_retries(self): + return None + + infinite_number = 20 + + stream = StubCustomBackoffHttpStreamRetries() + req = requests.Response() + req.status_code = HTTPStatus.TOO_MANY_REQUESTS + send_mock = mocker.patch.object(requests.Session, "send", side_effect=[req] * infinite_number) + + # Expecting mock object to raise a RuntimeError when the end of side_effect list parameter reached. + with pytest.raises(RuntimeError): + list(stream.read_records(SyncMode.full_refresh)) + assert send_mock.call_count == infinite_number + 1 + + @pytest.mark.parametrize("http_code", [400, 401, 403]) def test_4xx_error_codes_http_stream(mocker, http_code): stream = StubCustomBackoffHttpStream() @@ -190,9 +236,10 @@ def test_raise_on_http_errors_off_5xx(mocker, status_code): req = requests.Response() req.status_code = status_code - mocker.patch.object(requests.Session, "send", return_value=req) + send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises(DefaultBackoffException): list(stream.read_records(SyncMode.full_refresh)) + assert send_mock.call_count == stream.max_retries + 1 @pytest.mark.parametrize("status_code", [400, 401, 402, 403, 416])