Skip to content

Commit

Permalink
Convert hard-coded allowlist error code to be argument of HttpSensor (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
okayhooni committed Aug 26, 2023
1 parent 9ce76e3 commit b1f2a16
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
13 changes: 11 additions & 2 deletions airflow/providers/http/sensors/http.py
Expand Up @@ -33,7 +33,9 @@ class HttpSensor(BaseSensorOperator):
HTTP Error codes other than 404 (like 403) or Connection Refused Error
would raise an exception and fail the sensor itself directly (no more poking).
To avoid failing the task for other codes than 404, the argument ``extra_option``
To avoid failing the task for other codes than 404, the argument ``response_error_codes_allowlist``
can be passed with the list containing all the allowed error status codes, like ``["404", "503"]``
To skip error status code check at all, the argument ``extra_option``
can be passed with the value ``{'check_response': False}``. It will make the ``response_check``
be execute for any http status code.
Expand Down Expand Up @@ -62,6 +64,9 @@ def response_check(response, task_instance):
:param endpoint: The relative part of the full url
:param request_params: The parameters to be added to the GET url
:param headers: The HTTP headers to be added to the GET request
:param response_error_codes_allowlist: An allowlist to return False on poke(), not to raise exception.
If the ``None`` value comes in, it is assigned ["404"] by default, for backward compatibility.
When you also want ``404 Not Found`` to raise the error, explicitly deliver the blank list ``[]``.
:param response_check: A check against the 'requests' response object.
The callable takes the response object as the first positional argument
and optionally any number of keyword arguments available in the context dictionary.
Expand All @@ -85,6 +90,7 @@ def __init__(
method: str = "GET",
request_params: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
response_error_codes_allowlist: list[str] | None = None,
response_check: Callable[..., bool] | None = None,
extra_options: dict[str, Any] | None = None,
tcp_keep_alive: bool = True,
Expand All @@ -97,6 +103,9 @@ def __init__(
self.endpoint = endpoint
self.http_conn_id = http_conn_id
self.method = method
self.response_error_codes_allowlist = (
("404",) if response_error_codes_allowlist is None else tuple(response_error_codes_allowlist)
)
self.request_params = request_params or {}
self.headers = headers or {}
self.extra_options = extra_options or {}
Expand Down Expand Up @@ -130,7 +139,7 @@ def poke(self, context: Context) -> bool:
kwargs = determine_kwargs(self.response_check, [response], context)
return self.response_check(response, **kwargs)
except AirflowException as exc:
if str(exc).startswith("404"):
if str(exc).startswith(self.response_error_codes_allowlist):
return False

raise exc
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -34,6 +34,7 @@ Alibaba
alibaba
allAuthenticatedUsers
allowinsert
allowlist
allUsers
alphanumerics
Alphasort
Expand Down
50 changes: 50 additions & 0 deletions tests/providers/http/sensors/test_http.py
Expand Up @@ -183,6 +183,56 @@ def resp_check(_):
]
mock_log.error.assert_has_calls(calls)

@patch("airflow.providers.http.hooks.http.requests.Session.send")
def test_response_error_codes_allowlist(self, mock_session_send, create_task_of_operator):
allowed_error_response_gen = iter(
[
(503, "Service Unavailable"),
(503, "Service Unavailable"),
(503, "Service Unavailable"),
(404, "Not Found"),
(499, "Allowed Non-standard Error Code"),
]
)

def mocking_allowed_error_responses(*_, **__):
try:
error_code, error_reason = next(allowed_error_response_gen)
except StopIteration:
return mock.DEFAULT

error_response = requests.Response()
error_response.status_code = error_code
error_response.reason = error_reason

return error_response

def resp_check(_):
return True

final_response = requests.Response()
final_response.status_code = 500
final_response.reason = "Internal Server Error"

mock_session_send.side_effect = mocking_allowed_error_responses
mock_session_send.return_value = final_response

task = create_task_of_operator(
HttpSensor,
dag_id="http_sensor_response_error_codes_allowlist",
task_id="http_sensor_response_error_codes_allowlist",
response_error_codes_allowlist=["404", "499", "503"],
http_conn_id="http_default",
endpoint="",
request_params={},
method="GET",
response_check=resp_check,
timeout=5,
poke_interval=1,
)
with pytest.raises(AirflowException, match="500:Internal Server Error"):
task.execute(context={})


class FakeSession:
def __init__(self):
Expand Down

0 comments on commit b1f2a16

Please sign in to comment.