Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions providers/http/src/airflow/providers/http/operators/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ def paginate_sync(self, response: Response) -> Response | list[Response]:
return all_responses

def execute_async(self, context: Context) -> None:
if self.method.upper() not in ("GET", "HEAD", "OPTIONS"):
self.log.warning(
"HttpOperator with deferrable=True and method=%s may send duplicate requests if the Triggerer restarts.",
self.method,
)

self.defer(
trigger=HttpTrigger(
http_conn_id=self.http_conn_id,
Expand Down
30 changes: 30 additions & 0 deletions providers/http/tests/unit/http/operators/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,33 @@ def __init__(self, *_, **__): ...
base.BaseHook, "get_connection", lambda _cid: SimpleNamespace(login=None, password=None)
)
assert HttpOperator(task_id="test_HTTP_op_3")._resolve_auth_type() is None

@pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"])
def test_execute_async_warns_on_non_idempotent_method(self, monkeypatch, method):
monkeypatch.setattr(
base.BaseHook, "get_connection", lambda _cid: SimpleNamespace(login=None, password=None)
)
self._capture_defer(monkeypatch)

operator = HttpOperator(task_id="test_HTTP_op", method=method, deferrable=True)

with mock.patch.object(operator.log, "warning") as mock_warning:
operator.execute_async(context={})

mock_warning.assert_called_once()
call_args = mock_warning.call_args
assert method in call_args.args or method in str(call_args)

@pytest.mark.parametrize("method", ["GET", "HEAD", "OPTIONS"])
def test_execute_async_no_warning_on_idempotent_method(self, monkeypatch, method):
monkeypatch.setattr(
base.BaseHook, "get_connection", lambda _cid: SimpleNamespace(login=None, password=None)
)
self._capture_defer(monkeypatch)

operator = HttpOperator(task_id="test_HTTP_op", method=method, deferrable=True)

with mock.patch.object(operator.log, "warning") as mock_warning:
operator.execute_async(context={})

mock_warning.assert_not_called()
Loading