Skip to content

HttpOperator: deferrable=True may send duplicate requests on Triggerer restart #67945

@suhyeon729

Description

@suhyeon729

Under which category would you file this issue?

Providers

Apache Airflow version

3.2.2

What happened and how to reproduce it?

Issue Description

HttpOperator(deferrable=True) with non-idempotent HTTP methods (POST, PUT, DELETE, PATCH)
can send duplicate requests when the Triggerer process restarts.

Root cause:

execute_async() defers immediately without making the HTTP request.
The actual HTTP call happens inside HttpTrigger.run().
When the Triggerer restarts, the Trigger is re-instantiated from serialize()
and run() is called again from scratch — sending the same POST request a second time.

# operators/http.py:211
def execute_async(self, context):
    self.defer(
        trigger=HttpTrigger(method="POST", endpoint=..., data=...),
        method_name="execute_complete",
    )


# triggers/http.py:160
async def run(self):
    response = await self._get_response(hook) 
    yield TriggerEvent(...)

This is different from the correct deferrable pattern (e.g. AirbyteOperator),
where the Worker submits the job and gets a job_id first,
and the Trigger only polls with GET requests (idempotent):

# AirbyteOperator.execute() — correct pattern
job_object = hook.submit_sync_connection(...)  
self.job_id = job_object.job_id
self.defer(trigger=AirbyteSyncTrigger(job_id=self.job_id))

Steps to reproduce:

Set up an endpoint that records how many times it receives a POST request
Create a DAG with the following task:

HttpOperator(
    task_id="test",
    method="POST",
    endpoint="/record",
    http_conn_id="my_conn",
    deferrable=True,
)

Run the task and wait until it reaches DEFERRED state
Kill the Triggerer process and restart it
Observe the endpoint receives the POST request twice

What you think should happen instead?

The deferrable mode should be safe against Triggerer restarts.

Short-term fix: Add a UserWarning in execute_async() when a non-idempotent
method is used with deferrable=True, so users are aware of the risk:

def execute_async(self, context):
    if self.method.upper() not in ("GET", "HEAD", "OPTIONS"):
        warnings.warn(
            message=(
                f"HttpOperator deferrable=True with method={self.method} may send duplicate "
                "requests if the Triggerer restarts."
            ),
            category=UserWarning,
            stacklevel=2,
        )
    self.defer(trigger=HttpTrigger(...), method_name="execute_complete")

Long-term fix: For non-idempotent methods, execute the HTTP request in the Worker
(execute() phase) rather than inside the Trigger, following the same pattern as
AirbyteOperator. The Trigger should only be responsible for polling/waiting,
not for initiating side-effecting requests.

Operating System

No response

Deployment

None

Apache Airflow Provider(s)

http

Versions of Apache Airflow Providers

apache-airflow-providers-http==6.0.0

Official Helm Chart version

Not Applicable

Kubernetes Version

No response

Helm Chart configuration

No response

Docker Image customizations

No response

Anything else?

  • The duplicate request issue is silent — no error is raised, making it hard to detect in production.
  • In batch processing contexts, duplicate POST requests can cause unintended data duplication or duplicate job executions.
  • Related Trigger design principle (base.py):
    "Trigger classes should assume they will be persisted,
    and then rely on cleanup() being called when they are no longer needed."

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions