diff --git a/airflow/providers/http/sensors/http.py b/airflow/providers/http/sensors/http.py index 302ea98ef2a49..fb0ba11e64844 100644 --- a/airflow/providers/http/sensors/http.py +++ b/airflow/providers/http/sensors/http.py @@ -33,7 +33,9 @@ class HttpSensor(BaseSensorOperator): HTTP Error codes other than 404 (like 403) or Connection Refused Error would raise an exception and fail the sensor itself directly (no more poking). - To avoid failing the task for other codes than 404, the argument ``extra_option`` + To avoid failing the task for other codes than 404, the argument ``response_error_codes_allowlist`` + can be passed with the list containing all the allowed error status codes, like ``["404", "503"]`` + To skip error status code check at all, the argument ``extra_option`` can be passed with the value ``{'check_response': False}``. It will make the ``response_check`` be execute for any http status code. @@ -62,6 +64,9 @@ def response_check(response, task_instance): :param endpoint: The relative part of the full url :param request_params: The parameters to be added to the GET url :param headers: The HTTP headers to be added to the GET request + :param response_error_codes_allowlist: An allowlist to return False on poke(), not to raise exception. + If the ``None`` value comes in, it is assigned ["404"] by default, for backward compatibility. + When you also want ``404 Not Found`` to raise the error, explicitly deliver the blank list ``[]``. :param response_check: A check against the 'requests' response object. The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. @@ -85,6 +90,7 @@ def __init__( method: str = "GET", request_params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, + response_error_codes_allowlist: list[str] | None = None, response_check: Callable[..., bool] | None = None, extra_options: dict[str, Any] | None = None, tcp_keep_alive: bool = True, @@ -97,6 +103,9 @@ def __init__( self.endpoint = endpoint self.http_conn_id = http_conn_id self.method = method + self.response_error_codes_allowlist = ( + ("404",) if response_error_codes_allowlist is None else tuple(response_error_codes_allowlist) + ) self.request_params = request_params or {} self.headers = headers or {} self.extra_options = extra_options or {} @@ -130,7 +139,7 @@ def poke(self, context: Context) -> bool: kwargs = determine_kwargs(self.response_check, [response], context) return self.response_check(response, **kwargs) except AirflowException as exc: - if str(exc).startswith("404"): + if str(exc).startswith(self.response_error_codes_allowlist): return False raise exc diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index eb80939a4c4e1..df6b406d84f8d 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -34,6 +34,7 @@ Alibaba alibaba allAuthenticatedUsers allowinsert +allowlist allUsers alphanumerics Alphasort diff --git a/tests/providers/http/sensors/test_http.py b/tests/providers/http/sensors/test_http.py index b2fe5921a0a84..89fb25eebb52c 100644 --- a/tests/providers/http/sensors/test_http.py +++ b/tests/providers/http/sensors/test_http.py @@ -183,6 +183,56 @@ def resp_check(_): ] mock_log.error.assert_has_calls(calls) + @patch("airflow.providers.http.hooks.http.requests.Session.send") + def test_response_error_codes_allowlist(self, mock_session_send, create_task_of_operator): + allowed_error_response_gen = iter( + [ + (503, "Service Unavailable"), + (503, "Service Unavailable"), + (503, "Service Unavailable"), + (404, "Not Found"), + (499, "Allowed Non-standard Error Code"), + ] + ) + + def mocking_allowed_error_responses(*_, **__): + try: + error_code, error_reason = next(allowed_error_response_gen) + except StopIteration: + return mock.DEFAULT + + error_response = requests.Response() + error_response.status_code = error_code + error_response.reason = error_reason + + return error_response + + def resp_check(_): + return True + + final_response = requests.Response() + final_response.status_code = 500 + final_response.reason = "Internal Server Error" + + mock_session_send.side_effect = mocking_allowed_error_responses + mock_session_send.return_value = final_response + + task = create_task_of_operator( + HttpSensor, + dag_id="http_sensor_response_error_codes_allowlist", + task_id="http_sensor_response_error_codes_allowlist", + response_error_codes_allowlist=["404", "499", "503"], + http_conn_id="http_default", + endpoint="", + request_params={}, + method="GET", + response_check=resp_check, + timeout=5, + poke_interval=1, + ) + with pytest.raises(AirflowException, match="500:Internal Server Error"): + task.execute(context={}) + class FakeSession: def __init__(self):