Skip to content

Commit

Permalink
Handle 429 errors in retry_url
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed May 16, 2023
1 parent 63af63b commit 603c242
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 27 deletions.
66 changes: 52 additions & 14 deletions observatory-platform/observatory/platform/utils/url_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,47 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: James Diprose
# Author: James Diprose, Keegan Smith

import cgi
import json
import logging
import os
import time
import urllib.error
import urllib.request
from typing import Dict, List, Tuple, Union
from datetime import datetime
from email.utils import parsedate_to_datetime
from typing import Dict, List, Tuple, Union, Optional

import pytz
import requests
import time
import xmltodict
from airflow import AirflowException
from importlib_metadata import metadata
from requests.adapters import HTTPAdapter
from tenacity import Retrying, stop_after_attempt, before_sleep_log, wait_exponential_jitter
from tenacity.wait import wait_base
from tenacity.wait import wait_base, wait_fixed
from urllib3.util.retry import Retry


def parse_retry_after(retry_after: Optional[str]) -> Optional[int]:
"""Parse the Retry-After header. Can be a delay in seconds or a datetime.
:param retry_after: the Retry-After header.
:return: returns the number of seconds to wait for.
"""

delay = None
if retry_after is not None:
try:
delay = int(retry_after)
except ValueError:
retry_date = parsedate_to_datetime(retry_after)
delay = max(0., (retry_date - datetime.now(pytz.utc)).total_seconds())
return delay


def retry_get_url(
url: str,
num_retries=3,
Expand All @@ -53,28 +73,46 @@ def retry_get_url(
log_url = "***" if squelch_url else url
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
for attempt in Retrying(
retrier = Retrying(
stop=stop_after_attempt(num_retries),
wait=wait,
reraise=True,
before_sleep=before_sleep_log(logger, logging.INFO),
):
)
prev_wait = None
for attempt in retrier:
# Set the function that Tenacity thinks it's retrying. This fixes the name in the logs
attempt.retry_state.fn = retry_get_url

# Set wait back to original wait function after 429 error handling
if prev_wait is not None:
retrier.wait = prev_wait
prev_wait = None

with attempt:
try:
response = None
response = requests.get(url, **kwargs)
response.raise_for_status()
except requests.exceptions.RequestException as e:
response_log = f"Error getting url: {log_url} | "
if response is not None: # Timeout error doesn't result in response
response_log += f"Got response code: {response.status_code} | Reason: {response.reason} | "
response_log += (
f"Attempt: {attempt.retry_state.attempt_number} | Idle for: {attempt.retry_state.idle_for}"
)
except (requests.exceptions.ReadTimeout, requests.exceptions.HTTPError) as e:
log = f"Error getting url: {log_url} | "

if isinstance(e, requests.exceptions.HTTPError):
# Timeout error doesn't result in response, but HTTPError does
log += f"Got response code: {response.status_code} | Reason: {response.reason} | "

# Handle HTTP 429 error
if response.status_code == 429:
delay = parse_retry_after(response.headers.get("Retry-After"))
if delay is not None:
prev_wait = retrier.wait
retrier.wait = wait_fixed(delay)
log += f"Retry-After header detected, sleeping for: {delay} seconds"

log += f"Attempt: {attempt.retry_state.attempt_number} | Idle for: {attempt.retry_state.idle_for}"

# Using e.__class__ logs the specific name of the exception
raise e.__class__(response_log)
raise e.__class__(log)
return response


Expand Down
1 change: 1 addition & 0 deletions observatory-platform/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ json_lines>=0.5.0,<1 # Reading, including jsonl.gz
requests>=2.25.0,<3
tldextract>=3.1.1
aiohttp>=3.7.0,<4
responses>=0.23.1,<1

# SFTP
pysftp>=0.2.9,<1
Expand Down
113 changes: 100 additions & 13 deletions tests/observatory/platform/utils/test_url_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: James Diprose
# Author: James Diprose, Keegan Smith

import time
import unittest
from datetime import datetime
from typing import List
from unittest.mock import patch

import httpretty
import pendulum
import requests
import responses
import time
from airflow.exceptions import AirflowException
from click.testing import CliRunner
from tenacity import wait_fixed
Expand Down Expand Up @@ -93,33 +95,118 @@ def test_retry_session(self):
httpretty.disable()
httpretty.reset()

@responses.activate
def test_retry_get_url(self):
httpretty.enable()

# Test that we receive the last response
responses.reset()
url = "http://test.com/"
status_codes = [500, 404, 200]
bodies = ["Internal server error", "Page not found", "success"]
self.__create_mock_request_sequence(url, status_codes, bodies)
res = responses.Response(
method="GET",
url=url,
body="Internal server error",
status=500,
)
responses.add(res)
res = responses.Response(
method="GET",
url=url,
body="Page not found",
status=404,
)
responses.add(res)
res = responses.Response(
method="GET",
url=url,
body="success",
status=200,
)
responses.add(res)
response = retry_get_url(url, num_retries=3, wait=wait_fixed(0))
self.assertEqual(response.text, "success")
self.assertEqual(response.status_code, 200)

# Test that an HTTPError is triggered
responses.reset()
url = "http://fail.com/"
status_codes = [500, 500, 500, 500, 200] # It should fail before getting to status 200, because we only retry
# 3 times
bodies = ["Internal server error"] * 4 + ["success"]
self.__create_mock_request_sequence(url, status_codes, bodies)
res = responses.Response(
method="GET",
url=url,
body="Internal server error",
status=500,
)
responses.add(res)
res = responses.Response(
method="GET",
url=url,
body="Internal server error",
status=500,
)
responses.add(res)
res = responses.Response(
method="GET",
url=url,
body="Internal server error",
status=500,
)
responses.add(res)
res = responses.Response(
method="GET",
url=url,
body="success",
status=200,
)
responses.add(res)
with self.assertRaises(requests.exceptions.HTTPError):
response = retry_get_url(url, num_retries=3, wait=wait_fixed(0))
retry_get_url(url, num_retries=3, wait=wait_fixed(0))

# Test 429 error handling
responses.reset()
url = "http://toomanyrequests.com/"
res = responses.Response(method="GET", url=url, body="Internal server error", status=500)
responses.add(res)
res = responses.Response(
method="GET", url=url, body="Too many requests", status=429, headers={"Retry-After": "10"}
)
responses.add(res)
retry_after_date = pendulum.now().add(seconds=20).in_tz("GMT").format("ddd, DD MMM YYYY HH:mm:ss [GMT]")
print(f"retry_after_date: {retry_after_date}")
res = responses.Response(
method="GET",
url=url,
body="Too many requests",
status=429,
headers={"Retry-After": retry_after_date},
)
responses.add(res)
res = responses.Response(method="GET", url=url, body="Internal server error", status=500)
responses.add(res)
res = responses.Response(
method="GET",
url=url,
body="success",
status=200,
)
responses.add(res)
start = datetime.now()
response = retry_get_url(url, num_retries=5)
end = datetime.now()
self.assertEqual(response.text, "success")
self.assertEqual(response.status_code, 200)

expected_wait = 60.0
duration = (end - start).total_seconds()
self.assertAlmostEqual(expected_wait, duration, delta=2.5)

def test_retry_get_url_read_timeout(self):
# Test that a ReadTimeout is triggered

httpretty.enable()
url = "http://timeout.com/"
status_codes = [500, 500, 500, 200]
bodies = ["Internal server error"] * 4 + ["success"]
self.__create_mock_request_sequence(url, status_codes, bodies, sleep=3)
with self.assertRaises(requests.exceptions.ReadTimeout):
response = retry_get_url(url, num_retries=3, wait=wait_fixed(0), timeout=2)
retry_get_url(url, num_retries=3, wait=wait_fixed(0), timeout=2)

# Cleanup
httpretty.disable()
Expand Down

0 comments on commit 603c242

Please sign in to comment.