Skip to content

Commit

Permalink
fix: retry mechanism again catches return codes in the 500 range (DEV…
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Jan 15, 2024
1 parent 85e363b commit c927c21
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 92 deletions.
156 changes: 65 additions & 91 deletions src/dsp_tools/utils/connection_live.py
Expand Up @@ -15,89 +15,11 @@
from dsp_tools.utils.create_logger import get_logger
from dsp_tools.utils.set_encoder import SetEncoder

logger = get_logger(__name__)


def _try_network_action(
action: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Any:
"""
Helper method that tries 7 times to execute an action.
If a timeout error, a ConnectionError, a requests.exceptions.RequestException, or a non-permanent BaseError occors,
it waits and retries.
The waiting times are 1, 2, 4, 8, 16, 32, 64 seconds.
If another exception occurs, it escalates.
Args:
action: a lambda with the code to be executed, or a function
args: positional arguments for the action
kwargs: keyword arguments for the action
Raises:
BaseError: if the action fails permanently
unexpected exceptions: if the action fails with an unexpected exception
Returns:
the return value of action
"""
action_as_str = f"{action=}, {args=}, {kwargs=}"
for i in range(7):
try:
if args and not kwargs:
return action(*args)
elif not args and kwargs:
return action(**kwargs)
elif args and kwargs:
return action(*args, **kwargs)
else:
return action()
except (TimeoutError, ReadTimeout, ReadTimeoutError):
msg = f"Timeout Error: Try reconnecting to DSP server, next attempt in {2 ** i} seconds..."
print(f"{datetime.now()}: {msg}")
logger.error(f"{msg} {action_as_str} (retry-counter {i=:})", exc_info=True)
time.sleep(2**i)
except (ConnectionError, RequestException):
msg = f"Network Error: Try reconnecting to DSP server, next attempt in {2 ** i} seconds..."
print(f"{datetime.now()}: {msg}")
logger.error(f"{msg} {action_as_str} (retry-counter {i=:})", exc_info=True)
time.sleep(2**i)
except BaseError as err:
in_500_range = False
if err.status_code:
in_500_range = 500 <= err.status_code < 600
try_again_later = "try again later" in err.message
if try_again_later or in_500_range:
msg = f"Transient Error: Try reconnecting to DSP server, next attempt in {2 ** i} seconds..."
print(f"{datetime.now()}: {msg}")
logger.error(f"{msg} {action_as_str} (retry-counter {i=:})", exc_info=True)
time.sleep(2**i)
else:
raise err
HTTP_OK = 200
HTTP_SERVER_ERROR_LOWER = 500
HTTP_SERVER_ERROR_UPPER = 599

logger.error("Permanently unable to execute the network action. See logs for more details.")
raise BaseError("Permanently unable to execute the network action. See logs for more details.")


def check_for_api_error(response: requests.Response) -> None:
"""
Check the response of an API request if it contains an error raised by DSP-API.
Args:
response: The requests.Response object that is returned by the API request
Raises:
BaseError: If the status code of the response is not 200
"""
if response.status_code != 200:
raise BaseError(
message=f"KNORA-ERROR: status code={response.status_code}\nMessage: {response.text}",
status_code=response.status_code,
json_content_of_api_response=response.text,
reason_from_api=response.reason,
api_route=response.url,
)
logger = get_logger(__name__)


@dataclass
Expand Down Expand Up @@ -169,12 +91,12 @@ def _log_request(
url: str,
data: dict[str, Any] | None,
params: Optional[dict[str, Any]],
response: requests.Response,
response: Response,
timeout: int,
headers: dict[str, str] | None = None,
uploaded_file: str | None = None,
) -> None:
if response.status_code == 200:
if response.status_code == HTTP_OK:
_return = response.json()
if "token" in _return:
_return["token"] = "<token>"
Expand Down Expand Up @@ -242,7 +164,7 @@ def post(
elif files:
request = partial(request, files=files)

response: Response = _try_network_action(request)
response = self._try_network_action(request)
self._log_request(
method="POST",
url=url,
Expand All @@ -253,7 +175,6 @@ def post(
headers=headers,
timeout=timeout,
)
check_for_api_error(response)
return cast(dict[str, Any], response.json())

def get(
Expand Down Expand Up @@ -281,7 +202,7 @@ def get(
headers["Authorization"] = f"Bearer {self.token}"
timeout = self.timeout_get_delete

response: Response = _try_network_action(
response = self._try_network_action(
lambda: requests.get(
url=url,
headers=headers,
Expand All @@ -297,7 +218,6 @@ def get(
headers=headers,
timeout=timeout,
)
check_for_api_error(response)
return cast(dict[str, Any], response.json())

def put(
Expand Down Expand Up @@ -331,7 +251,7 @@ def put(
headers["Authorization"] = f"Bearer {self.token}"
timeout = self.timeout_put_post

response: Response = _try_network_action(
response = self._try_network_action(
lambda: requests.put(
url=url,
headers=headers,
Expand All @@ -350,7 +270,6 @@ def put(
headers=headers,
timeout=timeout,
)
check_for_api_error(response)
return cast(dict[str, Any], response.json())

def delete(
Expand Down Expand Up @@ -395,5 +314,60 @@ def delete(
headers=headers,
timeout=timeout,
)
check_for_api_error(response)
return cast(dict[str, Any], response.json())

def _should_retry(self, response: Response) -> bool:
in_500_range = HTTP_SERVER_ERROR_LOWER <= response.status_code <= HTTP_SERVER_ERROR_UPPER
try_again_later = "try again later" in response.text
return try_again_later or in_500_range

def _log_and_sleep(self, reason: str, retry_counter: int) -> None:
msg = f"{reason}: Try reconnecting to DSP server, next attempt in {2 ** retry_counter} seconds..."
print(f"{datetime.now()}: {msg}")
logger.exception(f"{msg} ({retry_counter=:})")
time.sleep(2**retry_counter)

def _try_network_action(self, action: Callable[[], Response]) -> Response:
"""
Try 7 times to execute a HTTP request.
If a timeout error, a ConnectionError, or a requests.RequestException occur,
or if the response indicates that there is a non-permanent server-side problem,
this function waits and retries the HTTP request.
The waiting times are 1, 2, 4, 8, 16, 32, 64 seconds.
Args:
action: a lambda with the code to be executed, or a function
Raises:
BaseError: if the action fails permanently
unexpected exceptions: if the action fails with an unexpected exception
Returns:
the return value of action
"""
for i in range(7):
try:
response = action()
except (TimeoutError, ReadTimeout, ReadTimeoutError):
self._log_and_sleep(reason="Timeout Error", retry_counter=i)
continue
except (ConnectionError, RequestException):
self._log_and_sleep(reason="Network Error", retry_counter=i)
continue

if self._should_retry(response):
self._log_and_sleep(reason="Transient Error", retry_counter=i)
continue
elif response.status_code != HTTP_OK:
raise BaseError(
message="Permanently unable to execute the network action. See logs for more details.",
status_code=response.status_code,
json_content_of_api_response=response.text,
reason_from_api=response.reason,
api_route=response.url,
)
else:
return response

# after 7 vain attempts to create a response, try it a last time and let it escalate
return action()
4 changes: 3 additions & 1 deletion test/e2e/utils/test_connection_live.py
Expand Up @@ -38,7 +38,9 @@ def test_log_in_and_out(self) -> None:
self.assertIsNotNone(con.token)
con.logout()
self.assertIsNone(con.token)
self.assertRaisesRegex(BaseError, "KNORA-ERROR: status code=400*", con.login, "invalid", "invalid")
self.assertRaisesRegex(
BaseError, "Permanently unable to execute the network action", con.login, "invalid", "invalid"
)

def test_get(self) -> None:
res = self.con.get("/ontology/0001/anything/simple/v2")
Expand Down

0 comments on commit c927c21

Please sign in to comment.