diff --git a/providers/http/src/airflow/providers/http/operators/http.py b/providers/http/src/airflow/providers/http/operators/http.py index 1c5da7688e5e4..9d46426c677ed 100644 --- a/providers/http/src/airflow/providers/http/operators/http.py +++ b/providers/http/src/airflow/providers/http/operators/http.py @@ -209,6 +209,12 @@ def paginate_sync(self, response: Response) -> Response | list[Response]: return all_responses def execute_async(self, context: Context) -> None: + if self.method.upper() not in ("GET", "HEAD", "OPTIONS"): + self.log.warning( + "HttpOperator with deferrable=True and method=%s may send duplicate requests if the Triggerer restarts.", + self.method, + ) + self.defer( trigger=HttpTrigger( http_conn_id=self.http_conn_id, diff --git a/providers/http/tests/unit/http/operators/test_http.py b/providers/http/tests/unit/http/operators/test_http.py index 50b66e5f61255..26e075a5937be 100644 --- a/providers/http/tests/unit/http/operators/test_http.py +++ b/providers/http/tests/unit/http/operators/test_http.py @@ -387,3 +387,33 @@ def __init__(self, *_, **__): ... base.BaseHook, "get_connection", lambda _cid: SimpleNamespace(login=None, password=None) ) assert HttpOperator(task_id="test_HTTP_op_3")._resolve_auth_type() is None + + @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) + def test_execute_async_warns_on_non_idempotent_method(self, monkeypatch, method): + monkeypatch.setattr( + base.BaseHook, "get_connection", lambda _cid: SimpleNamespace(login=None, password=None) + ) + self._capture_defer(monkeypatch) + + operator = HttpOperator(task_id="test_HTTP_op", method=method, deferrable=True) + + with mock.patch.object(operator.log, "warning") as mock_warning: + operator.execute_async(context={}) + + mock_warning.assert_called_once() + call_args = mock_warning.call_args + assert method in call_args.args or method in str(call_args) + + @pytest.mark.parametrize("method", ["GET", "HEAD", "OPTIONS"]) + def test_execute_async_no_warning_on_idempotent_method(self, monkeypatch, method): + monkeypatch.setattr( + base.BaseHook, "get_connection", lambda _cid: SimpleNamespace(login=None, password=None) + ) + self._capture_defer(monkeypatch) + + operator = HttpOperator(task_id="test_HTTP_op", method=method, deferrable=True) + + with mock.patch.object(operator.log, "warning") as mock_warning: + operator.execute_async(context={}) + + mock_warning.assert_not_called()