Skip to content

AIP-72: Handle clearing of XComs when task starts execution#45506

Merged
amoghrajesh merged 2 commits intoapache:mainfrom
astronomer:AIP72-clear-xcom-on-execution
Jan 9, 2025
Merged

AIP-72: Handle clearing of XComs when task starts execution#45506
amoghrajesh merged 2 commits intoapache:mainfrom
astronomer:AIP72-clear-xcom-on-execution

Conversation

@amoghrajesh
Copy link
Contributor

closes: #45419

XComs have to be cleared when a task starts execution. We are handling this in the run endpoint as this endpoint marks "execution" for a task instance.

Few points:

  • Do not clear XComs for cases when "next_method" is set - for deferrable and reschedule tasks
  • Clear Xcoms for other regular tasks
  • Ported unit tests from test_taskinstance.py - only test_xcom_pull_different_logical_date is pending.

Testing

Normal DAG

Steps:

  1. Create a normal DAG like this that doesn't push an xcom
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime, timedelta

dag = DAG(
    'hello_world_dag',
    schedule=None,
    catchup=False,
)

hello_task = BashOperator(
    task_id='say_hello',
    bash_command='echo "Hello World from Airflow!"',
    dag=dag,
    do_xcom_push=False
)

hello_task
  1. Directly hit the API in fast api to set an xcom for the task instance to stage an older Xcom being present
    image
image
  1. Run the dag again, the xcom will get cleared
image

DAG with a deferred Task

DAG:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger


class XComPushDeferOperator(BaseOperator):
    def execute(self, context):
        context["ti"].xcom_push("test", "test_value")

        self.defer(
            trigger=TimeDeltaTrigger(delta=timedelta(seconds=10)),
            method_name="next",
        )

    def next(self, context, event=None):
        pass


with DAG(
    "xcom_clear", schedule=None, start_date=datetime(2025, 1, 8),
) as dag:
    XComPushDeferOperator(task_id="xcom_push")

``` (ref from https://github.com/apache/airflow/issues/22931)

Steps:
1. Run the dag:
<img width="1725" alt="image" src="https://github.com/user-attachments/assets/4f6932c0-2df5-412a-a091-9594094684ae" />

2. XComs aren't cleared, still present
<img width="1725" alt="image" src="https://github.com/user-attachments/assets/c4b741a2-6356-4fae-888b-eb65d6dc0a80" />

3. Triggers running
![image](https://github.com/user-attachments/assets/50f3fd7e-2a31-43f0-9571-c6bca5b497d7)




<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information.
In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).

@amoghrajesh amoghrajesh requested review from ashb and kaxil January 9, 2025 10:17
@amoghrajesh amoghrajesh added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Jan 9, 2025
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 comment otherwise lgtm

@amoghrajesh amoghrajesh merged commit b18fccb into apache:main Jan 9, 2025
@amoghrajesh amoghrajesh deleted the AIP72-clear-xcom-on-execution branch January 9, 2025 11:08
karenbraganz pushed a commit to karenbraganz/airflow that referenced this pull request Jan 13, 2025
HariGS-DB pushed a commit to HariGS-DB/airflow that referenced this pull request Jan 16, 2025
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Handle clearing XCom in run Execution endpoint

2 participants