Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xcomms with Taskflow API cause unwanted dependencies in UI #17686

Open
frankbreetz opened this issue Aug 18, 2021 · 17 comments
Open

Xcomms with Taskflow API cause unwanted dependencies in UI #17686

frankbreetz opened this issue Aug 18, 2021 · 17 comments

Comments

@frankbreetz
Copy link
Contributor

Apache Airflow version:2.1.1

OS:Debian GNU/Linux 10

Apache Airflow Provider versions:

Deployment:Astronomer

What happened:
When Using the TaskFlow API To implement XComms, you can get a unwanted dependency created in the UI

XCOMM DAG using TaskFlow API
from datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.decorators import task

def get_run_date():
    return '01-01-2021'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
@dag (default_args=default_args,
      schedule_interval=None,
        start_date=datetime(2021, 8, 1),
       dagrun_timeout=timedelta(hours=4),
       max_active_runs=1)
def xcomm_taskflow_dag():
    @task()
    def set_date():
        date ="1-2-21"
        return date
    @task()
    def xcomm_task2(date):
        print(f"xcomm_task2:{date}")
    @task()
    def xcomm_task3(date):
        print(f"xcomm_task3:{date}")

    set_date=set_date()
    task2=xcomm_task2(set_date)
    task3=xcomm_task3(set_date)
    set_date>>task2>>task3

xcomm_taskflow_dag=xcomm_taskflow_dag()

Because both downstream tasks use the varible created by the first task there is a direct connection from each of the task to the first task.

Screen Shot 2021-08-18 at 10 06 11 AM

This also duplicates tasks in the Tree View (xcomm_task3)
Screen Shot 2021-08-18 at 10 08 42 AM

This is a simple example, but one can imagine it could become quite unwieldy with complex DAGs

What you expected to happen:
If we create an XCOMM without using the TaskFlow API, there is no unwanted dependency created, just the one explicitly stated using the bit shift operators

XCOMM DAG without Using TaskFlow API
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from airflow.decorators import dag

def get_run_date(ti):
    date= '01-01-2021'
    ti.xcom_push(key="date",value=date)
def var_fun2(ti):
    date=ti.xcom_pull(key="date",task_ids="get_date")
    print(date)

def var_fun3(ti):
    date=ti.xcom_pull(key="date",task_ids="get_date")
    print(date)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

@dag (default_args=default_args,
      schedule_interval=None,
        start_date=datetime(2021, 8, 1),
       dagrun_timeout=timedelta(hours=4),
       max_active_runs=1)
def xcomm_dag():
    set_date=PythonOperator(
        task_id="get_date",
        python_callable=get_run_date,
        provide_context=True
    )

    var_task2= PythonOperator(
        task_id="xcomm_task2",
        python_callable = var_fun2,
        provide_context=True
    )

    var_task3= PythonOperator(
        task_id="xcomm_task3",
        python_callable = var_fun3,
        provide_context=True
    )

    set_date >> var_task2 >> var_task3

xcomm_dag=xcomm_dag()
No there is no drect connection to downstream tasks in the Graph View

Screen Shot 2021-08-18 at 10 14 13 AM

There are no duplicated tasks in the Tree View

Screen Shot 2021-08-18 at 10 14 31 AM

How to reproduce it:
Upload the Included DAGs and View in the UI

Anything else we need to know: I understand how this could be designed this way as there is not only a downstream dependency, but direct dependency between the first DAG and downstream DAG. The old version did not create this dependency and the new TaskFlow API does. This may be feature request along the lines of Config setting to disable downstream dependencies or not create a dependency based of the TaskFlow API, or at least not have them shown in the UI. It would be nice to have clearer code like in the TaskFlow API example and also clearer UI like in the non-TaskFlow API example.

Are you willing to submit a PR?

@frankbreetz frankbreetz added the kind:bug This is a clearly a bug label Aug 18, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 18, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@eladkal eladkal added area:core affected_version:2.1 Issues Reported for 2.1 labels Aug 24, 2021
@alex-astronomer
Copy link
Contributor

I'm going to take a look at this one.

@ephraimbuddy
Copy link
Contributor

I made a bunch of mistakes trying to make this work until I looked at the non-taskflow example.
This should work for taskflow:

from datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.decorators import task

def get_run_date():
    return '01-01-2021'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
@dag(default_args=default_args,
      schedule_interval=None,
        start_date=datetime(2021, 8, 1),
       dagrun_timeout=timedelta(hours=4),
       max_active_runs=1)
def xcomm_taskflow_dag():
    @task()
    def set_date():
        date ="1-2-21"
        return date
    @task()
    def xcomm_task2(**kwargs):
        date=kwargs['ti'].xcom_pull(task_ids="set_date")
        print(f"xcomm_task2:{date}")

    @task()
    def xcomm_task3(**kwargs):
        date=kwargs['ti'].xcom_pull(task_ids="set_date")
        print(f"xcomm_task3:{date}")
    
    set_date() >> xcomm_task2() >> xcomm_task3()

xcomm_taskflow_dag=xcomm_taskflow_dag()

@ephraimbuddy
Copy link
Contributor

Another way that will also work:

@dag(default_args=default_args,
      schedule_interval=None,
        start_date=datetime(2021, 8, 1),
       dagrun_timeout=timedelta(hours=4),
       max_active_runs=1)
def xcomm_taskflow_dag():
    @task()
    def set_date():
        date ="1-2-21"
        return date
    @task()
    def xcomm_task2(date):
        print(f"xcomm_task2:{date}")

    @task()
    def xcomm_task3(**kwargs):
        date=kwargs['ti'].xcom_pull(task_ids="set_date")
        print(f"xcomm_task3:{date}")
    
    xcomm_task2(set_date()) >> xcomm_task3()

xcomm_taskflow_dag=xcomm_taskflow_dag()

I think the documentation need to be improved

@alex-astronomer
Copy link
Contributor

alex-astronomer commented Oct 1, 2021

I'm still inclined to call this a bug @ephraimbuddy. Here's why. I believe that all of the methods to define this should have the same behavior.

date = set_date()
task2 = xcomm_task2(date)
task3 = xcomm_task3(date)
date >> task2 >> task3

should output the same DAG as

@dag(default_args=default_args,
      schedule_interval=None,
        start_date=datetime(2021, 8, 1),
       dagrun_timeout=timedelta(hours=4),
       max_active_runs=1)
def xcomm_taskflow_dag():
    @task()
    def set_date():
        date ="1-2-21"
        return date
    @task()
    def xcomm_task2(**kwargs):
        date=kwargs['ti'].xcom_pull(task_ids="set_date")
        print(f"xcomm_task2:{date}")

    @task()
    def xcomm_task3(**kwargs):
        date=kwargs['ti'].xcom_pull(task_ids="set_date")
        print(f"xcomm_task3:{date}")
    
    set_date() >> xcomm_task2() >> xcomm_task3()

which should output the same DAG as

@dag(default_args=default_args,
      schedule_interval=None,
        start_date=datetime(2021, 8, 1),
       dagrun_timeout=timedelta(hours=4),
       max_active_runs=1)
def xcomm_taskflow_dag():
    @task()
    def set_date():
        date ="1-2-21"
        return date
    @task()
    def xcomm_task2(date):
        print(f"xcomm_task2:{date}")

    @task()
    def xcomm_task3(**kwargs):
        date=kwargs['ti'].xcom_pull(task_ids="set_date")
        print(f"xcomm_task3:{date}")
    
    xcomm_task2(set_date()) >> xcomm_task3()

I'm going to keep digging into this and see what differences there are between the 3 methods.

@alex-astronomer
Copy link
Contributor

alex-astronomer commented Oct 1, 2021

I believe that no matter how we define the DAG, and as long as the code is valid, it should deterministically create the same result.

@alex-astronomer
Copy link
Contributor

I believe that the culprit for this bug is the set_xcomargs_dependencies fn call in the apply_defaults function of the BaseOperator metaclass. Experimenting with this now.

@alex-astronomer
Copy link
Contributor

alex-astronomer commented Oct 1, 2021

Sorry about the comment spam, I just keep figuring out more and more about this issue. Here's what I've come up with from the research that I've done today.

By doing xcom_task2(date) (if we're looking at the previous examples which define the tasks using the taskflow API), there is an implicit dependency being set for this case between set_date >> xcom_task2. This was originally created as a response to this issue. The reason that I don't like this can be traced back to PEP-20, The Zen of Python.

I believe that it would make more sense within this context to not automatically set this dependency, even if it makes more sense logically to do so. We know logically that in order to pull an XCOM from a task, that previous task which pushes the XCOM must have been run first. So why not set that dependency automatically?

The main diagram of the DAG and use case in the original post for this ticket gives the answer to that. By implicitly setting the dependencies with set_xcomarg_dependencies, we get strange DAG behavior for more complicated DAGs, and more unnecessary dependencies.

If we change to explicitly setting the dependencies, that would require the user to logically understand the relationship between tasks and XCOMs but also falls more in line with Zen of Python guidelines about explicit vs. implicit.

Thoughts?

@ephraimbuddy
Copy link
Contributor

I have thought about this and I think it's ok as it is just that we currently don't know how to use it.

If we understand that calling a task decorated function in airflow produces an XComArg then it's not implicit that the relationship is set as is.

From the PythonOperator code, you could see that xcoms were passed explicitly, but in the TaskFlow example, because we didn't know we could pass xcom explicitly as we did in the PythonOperator, we created extra tasks while setting up the dependencies.

Anytime we call a task decorated function gives us an XComArg. It needs documentation especially in passing xcom as users think they can call the task decorated function many times and it remains the same task.

Where possible, we should use xcom_task2(xcom_task()) which creates the implicit relationship. However, we can pass **kwargs to a task function that has contexts that we can refer to and pull xcoms.

@ephraimbuddy ephraimbuddy added kind:documentation and removed kind:bug This is a clearly a bug labels Oct 6, 2021
@ephraimbuddy
Copy link
Contributor

We need to document the difference between passing Xcoms and creating dependencies between tasks in Taskflow API

@alex-astronomer
Copy link
Contributor

Can someone assign me to this? TY!

@potiuk
Copy link
Member

potiuk commented Oct 12, 2021

We need to document the difference between passing Xcoms and creating dependencies between tasks in Taskflow API

Quite agree - there are a number of things in the taskflow that are "implicit" or not really documented (I just fixe one of those seeming obvious things about context #18868 which was not obvious at all for new users (or even seasoned committers :D ) . So more docs on how TaskFlow works is really needed.

@alex-astronomer
Copy link
Contributor

In progress right now! Actively writing as we speak @potiuk

@eladkal
Copy link
Contributor

eladkal commented Jan 17, 2022

@alex-astronomer are you still working on this issue?

@alex-astronomer
Copy link
Contributor

Yes I am, should have more time this coming week to get in a PR. Thanks for your patience.

@eladkal
Copy link
Contributor

eladkal commented Nov 5, 2022

hey @alex-astronomer is this still happening in latest airflow version?

@ricardogaspar2
Copy link

hey there! @alex-astronomer any updates on this. The XCOM dependency edges on more complex DAGs are really annoying and make the DAGs (mainly dynamic dags) really hard to understand. I makes any developer question himself/herself if it's better to use taskflow (and the returned outputs/ wrapped XCOMS) to have more readable DAG python code vs defaulting to JINJA templating to pass values so that the graph is not rendered with so many edges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants