Skip to content

Commit

Permalink
Recover from updated build_param in Phylopic DAG (#3874)
Browse files Browse the repository at this point in the history
* Update DelayedRequester to raise the actual HTTPError

* Update Freesound error handling

* Update Phylopic to restart ingestion when build_param changes mid-ingestion

* Remove camelCase

* Linting
  • Loading branch information
stacimc committed Mar 12, 2024
1 parent bcb2af9 commit d80b590
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 75 deletions.
109 changes: 66 additions & 43 deletions catalog/dags/common/requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import requests
from airflow.exceptions import AirflowException
from requests.exceptions import JSONDecodeError
from requests.exceptions import HTTPError, JSONDecodeError

import oauth2
from common.loader import provider_details as prov
Expand All @@ -23,12 +23,6 @@ class SocketConnectBlockedError(Exception):
logger = logging.getLogger(__name__)


class RetriesExceeded(Exception):
"""Custom exception for when the number of allowed retries has been exceeded."""

pass


class DelayedRequester:
"""
Requester class with a built-in delay.
Expand Down Expand Up @@ -73,15 +67,8 @@ def _make_request(
request_kwargs["headers"] = self.headers
try:
response = method(url, **request_kwargs)
if response.status_code == requests.codes.ok:
logger.debug(f"Received response from url {response.url}")
elif response.status_code == requests.codes.unauthorized:
logger.error(f"Authorization failed for URL: {response.url}")
else:
logger.warning(
f"Unable to request URL: {response.url} "
f"Status code: {response.status_code}"
)
response.raise_for_status()

return response
except SocketConnectBlockedError:
# This exception will only be raised during testing, and it *must*
Expand All @@ -94,12 +81,14 @@ def _make_request(
# sent a SIGTERM, which means that the task should be stopped.
raise
except Exception as e:
# All other exceptions are logged and re-raised
logger.error(f"Error with the request for URL: {url}")
logger.info(f"{type(e).__name__}: {e}")
if params := request_kwargs.get("params"):
logger.info(f"Using query parameters {params}")
logger.info(f'Using headers {request_kwargs.get("headers")}')
return None

raise

def get(self, url, params=None, **kwargs):
"""
Expand Down Expand Up @@ -148,36 +137,70 @@ def _get_json(self, response) -> dict | list | None:
except JSONDecodeError as e:
logger.warning(f"Could not get response_json.\n{e}")

def _attempt_retry_get_response_json(
self,
error,
endpoint,
retries=0,
query_params=None,
request_method="get",
**kwargs,
):
"""
Attempt to retry `get_response_json` after a failure, with the given arguments. If
there are no remaining retries, it will instead raise the error.
"""
if retries <= 0:
logger.error("No retries remaining. Failure.")
raise error

logger.warning(error)
logger.warning(
"Retrying:\n_get_response_json(\n"
f" {endpoint},\n"
f" {query_params},\n"
f" retries={retries - 1}"
")"
)
return self.get_response_json(
endpoint,
retries=retries - 1,
query_params=query_params,
request_method=request_method,
**kwargs,
)

def get_response_json(
self, endpoint, retries=0, query_params=None, requestMethod="get", **kwargs
self, endpoint, retries=0, query_params=None, request_method="get", **kwargs
):
response_json = None
response = None
if retries < 0:
logger.error("No retries remaining. Failure.")
raise RetriesExceeded("Retries exceeded")

if requestMethod == "get":
response = self.get(endpoint, params=query_params, **kwargs)
elif requestMethod == "post":
response = self.post(endpoint, params=query_params, **kwargs)

if response is not None and response.status_code == 200:
response_json = self._get_json(response)

if response_json is None or (
isinstance(response_json, dict) and response_json.get("error") is not None
):
logger.warning(f"Bad response_json: {response_json}")
logger.warning(
"Retrying:\n_get_response_json(\n"
f" {endpoint},\n"
f" {query_params},\n"
f" retries={retries - 1}"
")"
)
response_json = self.get_response_json(
endpoint, retries=retries - 1, query_params=query_params, **kwargs

try:
if request_method == "get":
response = self.get(endpoint, params=query_params, **kwargs)
elif request_method == "post":
response = self.post(endpoint, params=query_params, **kwargs)

if response is not None and response.status_code == 200:
response_json = self._get_json(response)

if response_json is None or (
isinstance(response_json, dict)
and response_json.get("error") is not None
):
# Status code was 200 but there was an error parsing response_json
response_json = self._attempt_retry_get_response_json(
ValueError(f"Bad response_json: {response_json}"),
endpoint,
retries,
query_params,
request_method,
**kwargs,
)
except HTTPError as e:
response_json = self._attempt_retry_get_response_json(
e, endpoint, retries, query_params, request_method, **kwargs
)

return response_json
Expand Down
12 changes: 7 additions & 5 deletions catalog/dags/providers/provider_api_scripts/freesound.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from datetime import datetime

from airflow.models import Variable
from requests.exceptions import ConnectionError, SSLError
from requests.exceptions import ConnectionError, HTTPError, SSLError
from retry import retry

from common import constants
from common.licenses.licenses import get_license_info
from common.loader import provider_details as prov
from common.requester import RetriesExceeded
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester


Expand Down Expand Up @@ -142,12 +141,15 @@ def _get_set_info(self, set_url):
set_id = response_json.get("id")
set_name = response_json.get("name")
return set_id, set_name
except RetriesExceeded:
except HTTPError as error:
# https://github.com/WordPress/openverse-catalog/issues/659
# This should be temporary for the full run of Freesound, as
# some historical audio sets 404.
logger.warning("Unable to fetch audio_set information")
return None, None
if error.response.status_code == 404:
logger.warning("Unable to fetch audio_set information")
return None, None
else:
raise

def _get_audio_set_info(self, media_data):
# set id, set name, set url
Expand Down
34 changes: 32 additions & 2 deletions catalog/dags/providers/provider_api_scripts/phylopic.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import logging

from requests.exceptions import HTTPError

from common import constants
from common.licenses import get_license_info
from common.loader import provider_details as prov
Expand All @@ -35,15 +37,43 @@ def __init__(self, *args, **kwargs):

def ingest_records(self):
self._get_initial_query_params()
super().ingest_records()
try:
super().ingest_records()

except HTTPError as error:
# Catch 410 error caused by the build_param changing while ingestion is ongoing
if error.response.status_code == 410:
# Refetch initial query params; this will update the build_param to the
# most recent value and reset the `current_page` to 1.
old_build_param = self.build_param
self._get_initial_query_params()

if old_build_param == self.build_param:
# If the build_param could not be updated, there must be another
# issue. Raise the original error.
raise

# Otherwise, the build_param did in fact change. Attempt ingestion
# again with the new param.
logger.info(
f"Build_param changed from {old_build_param} to {self.build_param}"
" during ingestion. Restarting ingestion from the beginning."
)
super().ingest_records()

else:
# Raise all other errors
raise

def _get_initial_query_params(self) -> None:
"""Get the required `build` param from the API and set the total pages."""
resp = self.get_response_json(query_params={})
if not resp:
raise Exception("No response from Phylopic API.")
self.build_param = resp.get("build")
self.current_page = 1
self.total_pages = resp.get("totalPages")
self.build_param = resp.get("build")

logger.info(
f"Total items to fetch: {resp.get('totalItems')}. "
f"Total pages: {self.total_pages}."
Expand Down
59 changes: 38 additions & 21 deletions catalog/tests/dags/common/test_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ def mock_requests_get(url, params, **kwargs):
r.status_code = 200
return r

monkeypatch.setattr(requester.requests.Session, "get", mock_requests_get)

delay = 1
dq = requester.DelayedRequester(delay=delay)
monkeypatch.setattr(dq.session, "get", mock_requests_get)

start = time.time()
dq.get("http://fake_url")
dq.get("http://fake_url")
Expand All @@ -62,34 +62,26 @@ def mock_requests_get(url, params, **kwargs):
monkeypatch.setattr(dq.session, "get", mock_requests_get)

with caplog.at_level(logging.WARNING):
dq.get("https://google.com/")
assert "Error with the request for URL: https://google.com/" in caplog.text
with pytest.raises(requests.exceptions.ReadTimeout):
dq.get("https://google.com/")
assert "Error with the request for URL: https://google.com/" in caplog.text


@pytest.mark.parametrize(
"code, log_level, expected_message",
[
(500, logging.WARNING, "Unable to request URL"),
(401, logging.ERROR, "Authorization failed for URL"),
],
)
def test_get_handles_failure_status_codes(
code, log_level, expected_message, monkeypatch, caplog
):
@pytest.mark.parametrize("code", (500, 401))
def test_get_handles_failure_status_codes(code, monkeypatch, caplog):
url = "https://google.com/"
mock_response = MagicMock()
mock_response.status_code = code
mock_response.url = url

def mock_requests_get(url, params, **kwargs):
return mock_response
r = requests.Response()
r.status_code = code
r.url = url
return r

dq = requester.DelayedRequester(1)
monkeypatch.setattr(dq.session, "get", mock_requests_get)

with caplog.at_level(log_level):
with pytest.raises(requests.exceptions.HTTPError, match=str(code)):
dq.get(url)
assert f"{expected_message}: {url}" in caplog.text


def test_get_response_json_retries_with_none_response():
Expand Down Expand Up @@ -119,6 +111,31 @@ def test_get_response_json_retries_with_non_ok():
assert mock_get.call_count == 3


def test_get_response_json_gets_response_on_retry():
# Test that the response is returned when it fails initially but
# succeeds on a retry
dq = requester.DelayedRequester(1)
failure_response = requests.Response()
failure_response.status_code = 410
success_response = requests.Response()
success_response.status_code = 200
success_response.json = MagicMock(return_value={"foo": "bar"})

with patch.object(dq, "get") as mock_get:
mock_get.side_effect = [
# First try fails
failure_response,
# Second try succeeds
success_response,
]
assert dq.get_response_json(
"https://google.com/",
retries=2,
) == {"foo": "bar"}

assert mock_get.call_count == 2


def test_get_response_json_retries_with_error_json():
dq = requester.DelayedRequester(1)
r = requests.Response()
Expand Down Expand Up @@ -193,7 +210,7 @@ def test_handles_optional_headers(
init_headers, request_kwargs, expected_request_kwargs
):
dq = requester.DelayedRequester(0, headers=init_headers)
dq.session.get = MagicMock(return_value=None)
dq.session.get = MagicMock(return_value=MagicMock())
url = "http://test"
params = {"testy": "test"}
dq.get(url, params, **(request_kwargs or {}))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest.mock import MagicMock, patch

import pytest
from requests.exceptions import HTTPError

from catalog.tests.dags.providers.provider_api_scripts.resources.json_load import (
make_resource_json_func,
Expand Down Expand Up @@ -100,7 +101,7 @@ def test_handles_failure_to_get_set_info():
with patch.object(fsd.delayed_requester, "get") as get_mock, patch("time.sleep"):
error_response = MagicMock()
error_response.status_code = 404
get_mock.return_value = error_response
get_mock.side_effect = HTTPError(response=error_response)

actual_id, actual_name, actual_url = fsd._get_audio_set_info(
{"pack": "https://freesound.org/apiv2/packs/35596/"}
Expand Down
26 changes: 26 additions & 0 deletions catalog/tests/dags/providers/provider_api_scripts/test_phylopic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from unittest.mock import patch

import pytest
import requests
from requests.exceptions import HTTPError

from catalog.tests.dags.providers.provider_api_scripts.resources.json_load import (
make_resource_json_func,
Expand Down Expand Up @@ -151,3 +153,27 @@ def test_get_record_data_returns_none_when_required_values_missing(property):

image = pp.get_record_data(data)
assert image is None


def test_build_param_is_recalculated_if_changes_during_ingestion():
pp = PhylopicDataIngester()

mock_410_response = requests.Response()
mock_410_response.status_code = 410

with patch.object(pp, "get_response_json") as mock_get_response_json:
mock_get_response_json.side_effect = [
# First is the call in _get_initial_query_params to fetch the build param
{"totalPages": 1, "build": 123},
# Second is the call from get_batch, which will use the initial build param.
# Simulate a 410 response due to the build param having changed
HTTPError(response=mock_410_response),
# _get_initial_query_params is called again
{"totalPages": 1, "build": 124},
# get_batch called with new build param, this time is successful.
# The empty batch will cause ingestion to stop gracefully
{"_embedded": {"items": []}},
]

pp.ingest_records()
assert mock_get_response_json.call_count == 4

0 comments on commit d80b590

Please sign in to comment.