Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XCom not working when xcom_pull from subdag task in main DAG #27785

Closed
1 of 2 tasks
ana-carolina-januario opened this issue Nov 18, 2022 · 8 comments
Closed
1 of 2 tasks
Labels
area:core kind:bug This is a clearly a bug

Comments

@ana-carolina-januario
Copy link

Apache Airflow version

2.4.3

What happened

I have a DAG with a SubDAGOperator task that has two PythonOperator tasks. Both of the subdag's tasks return values and publish them into xcom. I confirmed and the values are available in XCom List (throught UI).
When inside the subdag I can read the returned values(throught xcom) from one task to the other task.
In the main DAG, the downstream task follow the subdagOperator task needs to pull from xcom the value returned by one of the subdag's task. This is the part that fails: a task from the main DAG can't read the xcom from the subdag tasks.

I've tested reading the xcom using BashOperator and PythonOperator.

I am using python 3.10.8 and Airflow 2.4.3

What you think should happen instead

Using the code:

command = 'echo The returned_value xcom is {{ task_instance.xcom_pull(' \
              'task_ids="push_xcom_value", ' \
              f'dag_id="{DAG_NAME}.{SUB_DAG_NAME}"' \
              ')}}'
    read_xcom_bash = BashOperator(
        task_id="bash_pull",
        bash_command=command,
        do_xcom_push=False,
        dag=dag
    )

for the reader task, we should be able to get the values from xcom.

How to reproduce

I copied the code from airflow's repository examples from xcom usage example and the subdag usage example. the specific bash command used to reproduce the issue is:

command = 'echo The returned_value xcom is {{ task_instance.xcom_pull(' \
              'task_ids="push_xcom_value", ' \
              f'dag_id="{DAG_NAME}.{SUB_DAG_NAME}"' \
              ')}}'
    read_xcom_bash = BashOperator(
        task_id="bash_pull",
        bash_command=command,
        do_xcom_push=False,
        dag=dag
    )

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==6.0.0
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.4.0
apache-airflow-providers-common-sql==1.2.0
apache-airflow-providers-docker==3.2.0
apache-airflow-providers-elasticsearch==4.2.1
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-google==8.4.0
apache-airflow-providers-grpc==3.0.0
apache-airflow-providers-hashicorp==3.1.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-jdbc==3.2.1
apache-airflow-providers-microsoft-azure==4.3.0
apache-airflow-providers-mysql==3.2.1
apache-airflow-providers-odbc==3.1.2
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-redis==3.0.0
apache-airflow-providers-sendgrid==3.0.0
apache-airflow-providers-sftp==4.1.0
apache-airflow-providers-slack==6.0.0
apache-airflow-providers-sqlite==3.2.1
apache-airflow-providers-ssh==3.2.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

It occurs every time.
My team and I need this urgently.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@ana-carolina-januario ana-carolina-januario added area:core kind:bug This is a clearly a bug labels Nov 18, 2022
@ana-carolina-januario
Copy link
Author

The complete code is:

from __future__ import annotations

from datetime import timedelta, datetime

import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = 'test_xcom'
SUB_DAG_NAME = "subdag_writer"
FULL_SUB_DAG_NAME = f'{DAG_NAME}.{SUB_DAG_NAME}'
SUB_DAG_PUSH_TASK = "push_xcom_value_sub_dag"
SUB_DAG_KEY = "key_test"


def sub_dag(parent_dag_name, child_dag_name, args) -> DAG:
    dag_sub_dag = DAG(
        dag_id=f"{parent_dag_name}.{child_dag_name}",
        default_args={"retries": 2},
        start_date=pendulum.datetime(2022, 11, 17, tz="UTC"),
        catchup=False,
        schedule="@daily"
    )

    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push(key=SUB_DAG_KEY, value=data_string)
        return "xcom_value_test"

    PythonOperator(
        task_id=SUB_DAG_PUSH_TASK,
        default_args=args,
        python_callable=extract,
        dag=dag_sub_dag,
        do_xcom_push=True
    )

    return dag_sub_dag


DEFAULT_ARGS = {"retries": 2}


with DAG(
        dag_id=DAG_NAME,
        default_args={"retries": 2},
        start_date=datetime(2022, 1, 1),
        schedule="@once"
) as dag:
    def push_xcom_value(**kwargs):
        ti = kwargs['ti']
        return "test_value"
    push_xcom_value = PythonOperator(
        task_id='push_xcom_value',
        python_callable=push_xcom_value,
    )

    command_read_xcom_sub_dag = 'echo The returned_value xcom is {{ task_instance.xcom_pull(' \
              f'task_ids="{SUB_DAG_PUSH_TASK}", ' \
              f'dag_id="{DAG_NAME}.{SUB_DAG_NAME}"' \
              ')}}'
    read_xcom_sub_dag = BashOperator(
        task_id="read_xcom_sub_dag",
        bash_command=command_read_xcom_sub_dag,
        do_xcom_push=False,
        dag=dag
    )

    command_read_xcom_bash = 'echo The returned_value xcom is {{ task_instance.xcom_pull(' \
              'task_ids="push_xcom_value", ' \
              f'dag_id="{DAG_NAME}"' \
              ')}}'
    read_xcom_bash = BashOperator(
        task_id="read_xcom_bash",
        bash_command=command_read_xcom_bash,
        do_xcom_push=False,
        dag=dag
    )

    def read_xcom(**kwargs):
        ti = kwargs['ti']
        print('ti= ', ti,'\n')
        total_value_string = ti.xcom_pull(
            task_ids="push_xcom_value",
            key="return_value",
            dag_id=f"{DAG_NAME}")
        print(total_value_string)
    read_xcom_python = PythonOperator(
        task_id='read_xcom_python',
        python_callable=read_xcom,
    )

    def read_xcom_python_sub_dag(**kwargs):
        ti = kwargs['ti']
        total_value_string = ti.xcom_pull(
            task_ids=SUB_DAG_PUSH_TASK,
            key="return_value",
            dag_id=f"{FULL_SUB_DAG_NAME}")
        print(total_value_string)
        test = ti.xcom_pull(
            key=SUB_DAG_KEY,
            task_ids=SUB_DAG_PUSH_TASK,
            dag_id=f"{FULL_SUB_DAG_NAME}")
        print(total_value_string)
    read_xcom_python_sub_dag_task = PythonOperator(
        task_id='read_xcom_python_sub_dag',
        python_callable=read_xcom_python_sub_dag,
    )

    bash_push_sub_dag = SubDagOperator(
        task_id=SUB_DAG_NAME,
        subdag=sub_dag(
            parent_dag_name=DAG_NAME,
            child_dag_name=SUB_DAG_NAME,
            args=DEFAULT_ARGS),
        dag=dag
    )

    command_read_xcom_bash_1 = 'echo The returned_value xcom is {{ task_instance.xcom_pull(' \
              f'task_ids="{SUB_DAG_PUSH_TASK}", ' \
              f'dag_id="{DAG_NAME}"' \
              ')}}'
    read_xcom_bash_1 = BashOperator(
        task_id="read_xcom_bash_1",
        bash_command=command_read_xcom_bash_1,
        do_xcom_push=False,
        dag=dag
    )

    push_xcom_value >> bash_push_sub_dag >> [read_xcom_sub_dag, read_xcom_bash, read_xcom_python, read_xcom_python_sub_dag_task, read_xcom_bash_1]

As you can see we've tested in multiple ways to get to the subdag's pushed value. All with no success.

Thanks in advance.

@o-nikolas
Copy link
Contributor

o-nikolas commented Nov 19, 2022

tl;dr: this actually is working, see my simplified example below. BUT there is an edge case where this doesn't work for manual triggers of the parent dag.

If you load the attached dag below into your airflow environment (simplified from what you provided) it will run every 5 minutes and the XCOM value from the task in the subdag will be correctly read back from the task in the parent dag. This works because xcom_pull now adds run_id to the XCOM query filter (see this PR: #19825). And when the parent dag is scheduled both the parent and subdag have the same dag run id (something like scheduled__<datetime>).
BUT, if you manually trigger the parent dag its run id is now manual__<datetime> while the subdag is still scheduled__<datetime> which means the filtering on run_id excludes all xcom values from the child dag since it has a different dag run id. This is a legitimate regression that I think we should fix (@potiuk do you agree?).

I think there may have been some bugs with your dag code, but even if there wasn't, I assume you were manually triggering the parent dag, so the xcom could not be read successfully for the above reason.

from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = "test_xcom"
SUB_DAG_NAME = "subdag_writer"
SUB_DAG_PUSH_TASK = "push_xcom_value_sub_dag"


def sub_dag() -> DAG:
    with DAG(
        dag_id=f"{DAG_NAME}.{SUB_DAG_NAME}",
        start_date=datetime(2022, 1, 1),
        catchup=False,
        schedule="* * * * *",
    ) as subdag:

        def extract(**kwargs):
            print("hello Im running")
            return "xcom_value_test"

        PythonOperator(
            task_id=SUB_DAG_PUSH_TASK,
            python_callable=extract,
            dag=subdag,
            do_xcom_push=True,
        )

        return subdag


with DAG(
    dag_id=DAG_NAME,
    start_date=datetime(2022, 1, 1),
    catchup=False,
    schedule="*/5 * * * *",
) as dag:

    def push_xcom(**kwargs):
        return "test_value"

    def read_xcom(ti, **kwargs):
        return_value = ti.xcom_pull(
            task_ids="push_xcom_value_sub_dag",
            dag_id="test_xcom.subdag_writer", key="return_value",
        )
        print(f"I got this from xcom: {return_value}")

    push_xcom_value = PythonOperator(
        task_id="push_xcom_value",
        python_callable=push_xcom,
    )

    subdag = sub_dag()

    bash_push_sub_dag = SubDagOperator(
        task_id=SUB_DAG_NAME,
        subdag=subdag,
        dag=dag,
    )

    read_xcom_value = PythonOperator(
        task_id="read_xcom_value",
        python_callable=read_xcom,
    )

    push_xcom_value >> bash_push_sub_dag >> read_xcom_value

@AngeloPingoGalp
Copy link

AngeloPingoGalp commented Nov 19, 2022

Hi Niko,
Yes, we tested it with the manual trigger.
I tested your code and the "read_xcom_value" task didn't read the xcom value as well (I just rename the dag name to "test_xcom_issue"). Did you test it?

test_xcom_issue.subdag_writer log file:
[2022-11-19, 09:01:33 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_xcom_issue.subdag_writer', 'push_xcom_value_sub_dag', 'scheduled__2022-11-19T00:00:00+00:00', '--job-id', '459', '--raw', '--subdir', 'DAGS_FOLDER/tests/issue_response.py', '--cfg-path', '/tmp/tmpg77ykifq']
[2022-11-19, 09:01:33 UTC] {standard_task_runner.py:83} INFO - Job 459: Subtask push_xcom_value_sub_dag
[2022-11-19, 09:01:33 UTC] {task_command.py:376} INFO - Running <TaskInstance: test_xcom_issue.subdag_writer.push_xcom_value_sub_dag scheduled__2022-11-19T00:00:00+00:00 [running]> on host airflow-2-4-3-worker-0.airflow-2-4-3-worker.airflow.svc.cluster.local
[2022-11-19, 09:01:33 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_xcom_issue.subdag_writer
AIRFLOW_CTX_TASK_ID=push_xcom_value_sub_dag
AIRFLOW_CTX_EXECUTION_DATE=2022-11-19T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-11-19T00:00:00+00:00
[2022-11-19, 09:01:33 UTC] {logging_mixin.py:137} INFO - hello Im running
[2022-11-19, 09:01:33 UTC] {python.py:177} INFO - Done. Returned value was: xcom_value_test
[2022-11-19, 09:01:33 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=test_xcom_issue.subdag_writer, task_id=push_xcom_value_sub_dag, execution_date=20221119T000000, start_date=20221119T090133, end_date=20221119T090133
[2022-11-19, 09:01:33 UTC] {local_task_job.py:159} INFO - Task exited with return code 0
[2022-11-19, 09:01:33 UTC] {taskinstance.py:2623} INFO - 0 downstream tasks scheduled from follow-on schedule check

read_xcom_value log file:
[2022-11-19, 09:02:33 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_xcom_issue', 'read_xcom_value', 'scheduled__2022-11-19T00:00:00+00:00', '--job-id', '460', '--raw', '--subdir', 'DAGS_FOLDER/tests/issue_response.py', '--cfg-path', '/tmp/tmppzc4h1pm']
[2022-11-19, 09:02:33 UTC] {standard_task_runner.py:83} INFO - Job 460: Subtask read_xcom_value
[2022-11-19, 09:02:33 UTC] {task_command.py:376} INFO - Running <TaskInstance: test_xcom_issue.read_xcom_value scheduled__2022-11-19T00:00:00+00:00 [running]> on host airflow-2-4-3-worker-0.airflow-2-4-3-worker.airflow.svc.cluster.local
[2022-11-19, 09:02:33 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_xcom_issue
AIRFLOW_CTX_TASK_ID=read_xcom_value
AIRFLOW_CTX_EXECUTION_DATE=2022-11-19T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-11-19T00:00:00+00:00
[2022-11-19, 09:02:33 UTC] {logging_mixin.py:137} INFO - I got this from xcom: None
[2022-11-19, 09:02:33 UTC] {python.py:177} INFO - Done. Returned value was: None
[2022-11-19, 09:02:33 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=test_xcom_issue, task_id=read_xcom_value, execution_date=20221119T000000, start_date=20221119T090233, end_date=20221119T090233
[2022-11-19, 09:02:33 UTC] {local_task_job.py:159} INFO - Task exited with return code 0
[2022-11-19, 09:02:33 UTC] {taskinstance.py:2623} INFO - 0 downstream tasks scheduled from follow-on schedule check

@AngeloPingoGalp
Copy link

AngeloPingoGalp commented Nov 19, 2022

Hi Niko,

I changed the code, basically, some simplifications and I changed the schedule to every 5 minutes and worked. With this test, I assume the once annotation doesn't work as well, right? But your explanation doesn't make sense for this particular case, because it used scheduled="scheduled__2022-11-19T00:00:00+00:00".

Code:

from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = "test_xcom_issue"
SUB_DAG_NAME = "subdag_writer"
SUB_DAG_FULL_NAME = f"{DAG_NAME}.{SUB_DAG_NAME}"
SUB_DAG_PUSH_TASK = "push_xcom_value_sub_dag"


def sub_dag() -> DAG:
    with DAG(
        dag_id=SUB_DAG_FULL_NAME,
        start_date=datetime(2022, 11, 19),
        catchup=False,
        schedule="* * * * *",
    ) as subdag:

        def extract(**kwargs):
            print("hello Im running")
            return "xcom_value_test"

        PythonOperator(
            task_id=SUB_DAG_PUSH_TASK,
            python_callable=extract,
            dag=subdag,
            do_xcom_push=True,
        )

        return subdag


with DAG(
    dag_id=DAG_NAME,
    start_date=datetime(2022, 11, 19),
    catchup=False,
    schedule="*/5 * * * *"
) as dag:

    def read_xcom(ti, **kwargs):
        return_value = ti.xcom_pull(
            task_ids=SUB_DAG_PUSH_TASK,
            dag_id=SUB_DAG_FULL_NAME
        )
        print(f"I got this from xcom: {return_value}")

    bash_push_sub_dag = SubDagOperator(
        task_id=SUB_DAG_NAME,
        subdag=sub_dag(),
        dag=dag,
    )

    read_xcom_value = PythonOperator(
        task_id="read_xcom_value",
        python_callable=read_xcom,
    )

    bash_push_sub_dag >> read_xcom_value

Thank you for your help and clarification. However, we have some use cases where we use the scheduler as None and the pipeline is triggered externally by the REST API interface or another way. There is some workaround to this issue for this use case?

Best regards,
Angelo Pingo.

@Taragolis
Copy link
Contributor

@ana-carolina-januario @AngeloPingoGalp just want to highlight that SubDags is deprecated feature and will be removed one day, I do not know if someone want to invest time to make any changes in depreciated functional.

Did you tried TasksGroup which is replacement for SubDag?

@AngeloPingo
Copy link

AngeloPingo commented Nov 19, 2022 via email

@o-nikolas
Copy link
Contributor

I changed the code, basically, some simplifications and I changed the schedule to every 5 minutes and worked. With this test, I assume the once annotation doesn't work as well, right? But your explanation doesn't make sense for this particular case, because it used scheduled="scheduled__2022-11-19T00:00:00+00:00".

Yup this was a typo of sorts, in the code snippet of mine you can see that I had the schedule (also 5 min) there but commented out, it was a slightly outdated version. I'll updated the snippet.

@o-nikolas
Copy link
Contributor

@ana-carolina-januario Can this issue be resolved?

@apache apache locked and limited conversation to collaborators Dec 3, 2022
@eladkal eladkal converted this issue into discussion #28085 Dec 3, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

5 participants