Apache Airflow version
3.1.8
If "Other Airflow 2 version" selected, which one?
No response
What happened?
We are seeing an intermittent failure pattern with deferrable GlueJobOperator where the same task instance collides on XCom keys across the initial execute/defer phase and the resumed execute_complete() phase.
The failures show up as duplicate-key / already-exists errors on XCom keys such as:
glue_job_run_details
return_value
In one representative timeline, the task progressed like this:
try 1 starts
try 1 logs duplicate XCom write on `glue_job_run_details`
try 1 still transitions to DEFERRED
try 1 resumes
try 1 fails on duplicate XCom write on `return_value`
try 1 transitions to UP_FOR_RETRY
try 2 starts
try 2 logs duplicate XCom write on `glue_job_run_details`
try 2 transitions to DEFERRED
try 2 resumes
try 2 fails on duplicate XCom write on `return_value`
try 2 transitions to FAILED
Example sanitized errors:
409 Conflict
The XCom with key: `glue_job_run_details` with mentioned task instance already exists.
409 Conflict
The XCom with key: `return_value` with mentioned task instance already exists.
In another occurrence of the same underlying problem family, the resumed task failed with the database uniqueness error on the XCom PK:
duplicate key value violates unique constraint "xcom_pkey"
Key (dag_run_id, task_id, map_index, key)=(..., run_job_task, -1, return_value) already exists
This does not look like a custom-operator bug. The custom operator involved was only a thin subclass of GlueJobOperator that enabled deferrable=True and did not override execute(), execute_complete(), or add custom XCom writes.
What you think should happen instead?
A deferrable GlueJobOperator should not fail because it tries to create an XCom key that was already written by an earlier phase of the same task instance lifecycle.
More specifically:
- provider-managed link metadata such as
glue_job_run_details should be idempotent across deferral/resume/retry boundaries, or
- deferred task execution should not preserve conflicting XCom rows that the resumed phase will write again, or
- the resumed / retried path should update or replace the existing XCom instead of failing on create.
How to reproduce
I do not have a distilled standalone reproducer yet, but the behavior appears to require this combination:
- Use
GlueJobOperator(deferrable=True).
- Allow the task to enter
DEFERRED and later resume via execute_complete().
- Have a retry/resume path where XCom rows from the earlier phase still exist.
- Observe duplicate-write failures on one or both of:
glue_job_run_details
return_value
Even without the full minimal DAG yet, the relevant upstream code paths seem to make the collision window plausible.
Operating System
Debian-based container image
Versions of Apache Airflow Providers
apache-airflow-providers-amazon as bundled with Airflow 3.1.8
Deployment
Other 3rd-party Helm chart / managed runtime
Deployment details
Observed on a managed Airflow 3.1.8 deployment using CeleryExecutor.
Anything else?
What looks relevant in the source:
GlueJobOperator.execute() persists glue_job_run_details before deferring, and also returns the Glue run id.
- The provider link helper writes
glue_job_run_details through xcom_push() and suppresses exceptions.
- On resume, Airflow calls
resume_execution() and then execute_complete(), and successful return values are auto-pushed to XCom return_value.
- Airflow explicitly does not clear XComs when resuming from deferral.
- There is also a separate operator-extra-link XCom path.
BaseOperatorLink.xcom_key defaults to _link_<ClassName>, so the Glue operator is effectively involved with both:
glue_job_run_details via provider persist()
_link_GlueJobRunDetailsLink via framework-managed operator extra links
Reference:
task-sdk/src/airflow/sdk/bases/operatorlink.py#L47-L55
My current read is that the deferrable Glue path is not idempotent with respect to its XCom writes across pre-deferral, resume, and retry boundaries. At minimum, glue_job_run_details and return_value appear vulnerable to duplicate-create semantics in this lifecycle.
If helpful, I can follow up with a minimal repro DAG once I isolate it.
Apache Airflow version
3.1.8
If "Other Airflow 2 version" selected, which one?
No response
What happened?
We are seeing an intermittent failure pattern with deferrable
GlueJobOperatorwhere the same task instance collides on XCom keys across the initial execute/defer phase and the resumedexecute_complete()phase.The failures show up as duplicate-key / already-exists errors on XCom keys such as:
glue_job_run_detailsreturn_valueIn one representative timeline, the task progressed like this:
Example sanitized errors:
In another occurrence of the same underlying problem family, the resumed task failed with the database uniqueness error on the XCom PK:
This does not look like a custom-operator bug. The custom operator involved was only a thin subclass of
GlueJobOperatorthat enableddeferrable=Trueand did not overrideexecute(),execute_complete(), or add custom XCom writes.What you think should happen instead?
A deferrable
GlueJobOperatorshould not fail because it tries to create an XCom key that was already written by an earlier phase of the same task instance lifecycle.More specifically:
glue_job_run_detailsshould be idempotent across deferral/resume/retry boundaries, orHow to reproduce
I do not have a distilled standalone reproducer yet, but the behavior appears to require this combination:
GlueJobOperator(deferrable=True).DEFERREDand later resume viaexecute_complete().glue_job_run_detailsreturn_valueEven without the full minimal DAG yet, the relevant upstream code paths seem to make the collision window plausible.
Operating System
Debian-based container image
Versions of Apache Airflow Providers
apache-airflow-providers-amazonas bundled with Airflow3.1.8Deployment
Other 3rd-party Helm chart / managed runtime
Deployment details
Observed on a managed Airflow 3.1.8 deployment using CeleryExecutor.
Anything else?
What looks relevant in the source:
GlueJobOperator.execute()persistsglue_job_run_detailsbefore deferring, and also returns the Glue run id.execute()persists link metadata here:providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L326-L339
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L342-L354
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L367
execute_complete()returns the run id again here:providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L369-L374
glue_job_run_detailsthroughxcom_push()and suppresses exceptions.GlueJobRunDetailsLink.key = "glue_job_run_details":providers/amazon/src/airflow/providers/amazon/aws/links/glue.py#L22-L28
BaseAwsLink.persist()writescontext["ti"].xcom_push(key=cls.key, ...):providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py#L81-L95
persist()is wrapped in@return_on_error(None), so a duplicate-key failure there can be swallowed:providers/amazon/src/airflow/providers/amazon/aws/utils/suppress.py#L41-L69
resume_execution()and thenexecute_complete(), and successful return values are auto-pushed to XComreturn_value.task-sdk/src/airflow/sdk/bases/operator.py#L1664-L1683
next_methodis set:task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1638-L1653
return_valuehere:task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1709-L1751
# However, do not clear it for deferral:airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L255-L283
BaseOperatorLink.xcom_keydefaults to_link_<ClassName>, so the Glue operator is effectively involved with both:glue_job_run_detailsvia providerpersist()_link_GlueJobRunDetailsLinkvia framework-managed operator extra linksReference:
task-sdk/src/airflow/sdk/bases/operatorlink.py#L47-L55
My current read is that the deferrable Glue path is not idempotent with respect to its XCom writes across pre-deferral, resume, and retry boundaries. At minimum,
glue_job_run_detailsandreturn_valueappear vulnerable to duplicate-create semantics in this lifecycle.If helpful, I can follow up with a minimal repro DAG once I isolate it.