Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Task Mapping skips tasks before upstream has started #28973

Closed
2 tasks done
inytar opened this issue Jan 16, 2023 · 14 comments · Fixed by #30641
Closed
2 tasks done

Dynamic Task Mapping skips tasks before upstream has started #28973

inytar opened this issue Jan 16, 2023 · 14 comments · Fixed by #30641
Assignees
Labels
affected_version:2.5 Issues Reported for 2.5 area:core kind:bug This is a clearly a bug

Comments

@inytar
Copy link
Contributor

inytar commented Jan 16, 2023

Apache Airflow version

2.5.0

What happened

In some cases we are seeing dynamic mapped task being skipped before upstream tasks have started & the dynamic count for the task can be calculated. We see this both locally in a with the LocalExecutor & on our cluster with the KubernetesExecutor.

To trigger the issue we need multiple dynamic tasks merging into a upstream task, see the images below for example. If there is no merging the tasks run as expected. The tasks also need to not know the number of dynamic tasks that will be created on DAG start, for example by chaining in an other dynamic task output.

screenshot_2023-01-16_at_14-57-23_test_skip_-graph-_airflow
screenshot_2023-01-16_at_14-56-44_test_skip_-graph-_airflow

If the DAG, task, or upstream tasks are cleared the skipped task runs as expected.

The issue exists both on airflow 2.4.x & 2.5.0.

Happy to help debug this further & answer any questions!

What you think should happen instead

The tasks should run after upstream tasks are done.

How to reproduce

The following code is able to reproduce the issue on our side:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator

# Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
# Add in extra tasks results in both `skipped_tasks` skipping, but
# no earlier tasks are ever skipped.
CHAIN_TASKS = 1


@task()
def add(x, y):
    return x, y


with DAG(
    dag_id="test_skip",
    schedule=None,
    start_date=datetime(2023, 1, 13),
) as dag:

    init = EmptyOperator(task_id="init_task")
    final = EmptyOperator(task_id="final")

    for i in range(2):
        with TaskGroup(f"task_group_{i}") as tg:
            chain_task = [i]
            for j in range(CHAIN_TASKS):
                chain_task = add.partial(x=j).expand(y=chain_task)
            skipped_task = (
                add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
            )

        # Task isn't skipped if final (merging task) is removed.
        init >> tg >> final

Operating System

MacOS

Versions of Apache Airflow Providers

This can be reproduced without any extra providers installed.

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@inytar inytar added area:core kind:bug This is a clearly a bug labels Jan 16, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jan 16, 2023

Thanks for opening your first issue here! Be sure to follow the issue template!

@uranusjr
Copy link
Member

Can you try if #28592 fixed this?

@ephraimbuddy
Copy link
Contributor

Working for me in the current main but the graph is not displaying(cc @bbovenzi ).

@bbovenzi
Copy link
Contributor

Working for me in the current main but the graph is not displaying(cc @bbovenzi ).

Did it not load at all? Or just incorrectly? I fixed one graph issue yesterday: #28993

@inytar
Copy link
Contributor Author

inytar commented Jan 18, 2023

I tried this both in main and #28592 but still see the same issue. Also as @ephraimbuddy I don't see the graph either, but I guess that's an other issue.

image

@ephraimbuddy
Copy link
Contributor

@inytar , it works for me in the current main. No skipping. What are your other settings like?

@bbovenzi , it doesn't come up on the graph view in the current main

Screenshot 2023-01-19 at 14 11 21

@bbovenzi
Copy link
Contributor

Oh I see. I have a fix for the UI part here: #29042

@inytar
Copy link
Contributor Author

inytar commented Jan 19, 2023

@inytar , it works for me in the current main. No skipping. What are your other settings like?

I've seen the issue on different settings/environments:

  • Production (first noticed the issue here):
    • k8s cluster
    • Postgresql DB
    • k8s executor
    • Airflow 2.5.0
  • Local Docker:
    • local executor
    • Postgresql DB
    • Airflow 2.5.0
  • Local standalone:
    • Main (commit: 23da4da)
    • airflow standalone
    • Fresh db & config

It seems to me like some kind of race condition, could you try upping the number of CHAIN_TASKS? If I set this to 1 I only see one task being skipped, if I set it to 2 I see both task being skipped.

I've updated the code a bit to add a workaround. This just adds two empty tasks before the merge, when this is done the tasks aren't skipped anymore (also useful for anyone else that might hit this):

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator

# Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
# Add in extra tasks results in both `skipped_tasks` skipping, but
# no earlier tasks are ever skipped.
CHAIN_TASKS = 2

# Add workaround
WORKAROUND = False


@task()
def add(x, y):
    return x, y


with DAG(
    dag_id="test_skip",
    schedule=None,
    start_date=datetime(2023, 1, 13),
) as dag:

    init = EmptyOperator(task_id="init_task")
    final = EmptyOperator(task_id="final")

    for i in range(2):
        with TaskGroup(f"task_group_{i}") as tg:
            chain_task = [i]
            for j in range(CHAIN_TASKS):
                chain_task = (
                    add.override(task_id=f"add_{j}").partial(x=j).expand(y=chain_task)
                )
            skipped_task = (
                add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
            )

        init >> tg

        # Workaround: Adding an empty normal task before the merge step fixes the issue.
        if WORKAROUND:
            workaround = EmptyOperator(task_id=f"workaround_{i}")
            tg >> workaround
            next = workaround
        else:
            next = tg

        # Task isn't skipped if final (merging task) is removed.
        next >> final

I'm OoO till Monday Jan 30th, so I won't be able to help much till then. When I'm back I'm happy to test, ect!

@darkfennertrader
Copy link

darkfennertrader commented Mar 31, 2023

Hello,
just to let you know that the problem still exists with multiple dynamic tasks in cascade. I solved the issue using an empty operator.

Airflow 2.5.2 (using the official docker-compose file)
Celery Executor
OS: Ubuntu 22.04
python version: 3.10.6

airflow_bug

import os
from typing import Dict, Any
from datetime import datetime
import logging
import pendulum
from airflow.decorators import dag, task  # type: ignore
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.empty import EmptyOperator
from dags_conf import default_args
from custom_operators.csv_to_postgres import CsvToPostgres  # type: ignore

local_tz = pendulum.timezone("Europe/Rome")  # type: ignore
task_logger = logging.getLogger("airflow.task")
airflow_home: Any = os.getenv("AIRFLOW_HOME")
BASE_DIR: str = airflow_home + "/data_sources/ibkr"
provider_name: Any = os.getenv("ibkr_provider_name")
frequency: Any = os.getenv("ibkr_frequency")

@dag(
    dag_id="ibkr_db_init_v_1_1_0",
    start_date=datetime(2023, 1, 1, tzinfo=local_tz),
    schedule=None,
    catchup=False,
    tags=["db init"],
    default_args=default_args,
)
def populate_db() -> None:
    @task(do_xcom_push=True)  # type: ignore
    def get_assets_years_and_files() -> list[Dict[str, str | list[str]]]:

        # (1) create list of assets
        assets = list(set(os.listdir(BASE_DIR)))

        # (2) create list of assets and relative directories
        assets_years = list({})
        for asset in assets:
            years: list[str] = list(set(os.listdir(BASE_DIR + "/" + asset)))
            years = [
                y for y in years if os.path.isdir(BASE_DIR + "/" + asset + "/" + y)
            ]

            assets_years.append({"symbol": asset, "years": years})

        return assets_years

    @task  # type: ignore
    def deduplication(symbol: str, years: list[str]) -> None:

        import pandas as pd

        overall_file_path = BASE_DIR + "/" + symbol + "/"

        # print(file_list)
        dframe = pd.DataFrame(
            columns=["Datetime", "Open", "High", "Low", "Close", "Volume"]
        )
        for year in years:
            base_path = BASE_DIR + "/" + symbol + "/" + year
            # list all files within a directory
            data_path: list[str] = [
                f
                for f in os.listdir(base_path)
                if os.path.isfile(os.path.join(base_path, f))
            ]

            # remove overall file if present for idempotency
            if os.path.isfile(os.path.join(overall_file_path, "final.csv")):
                os.remove(os.path.join(overall_file_path, "final.csv"))

            print(symbol)

            for file in data_path:
                filepath = base_path = BASE_DIR + "/" + symbol + "/" + year + "/" + file
                # print(filepath)
                data = pd.read_csv(filepath, parse_dates=["Datetime"], date_parser=lambda x: pd.to_datetime(x).tz_localize("UTC"))  # type: ignore
                dframe = pd.concat([dframe, data])  # type: ignore

        # renaming columns to make them compatible with db table columns
        dframe.rename(
            columns={
                "Datetime": "time",
                "Open": "open",
                "High": "high",
                "Low": "low",
                "Close": "close",
                "Volume": "volume",
            },
            inplace=True,
        )

        dframe.set_index("time", drop=True, inplace=True)  # type: ignore
        # deduplication
        dframe = dframe[~dframe.index.duplicated(keep="first")]  # type: ignore
        dframe.sort_index(inplace=True)  # type: ignore
        dframe.to_csv(overall_file_path + "/final.csv")
        print(dframe.shape)

    def list_of_dicts(elem: Dict[str, Any]) -> Dict[str, str]:
        return {
            "symbol": elem["symbol"].replace("-", "/"),
            "provider_name": provider_name,
            "frequency": frequency,
        }

    assets_years = get_assets_years_and_files()
    pg_input: list[dict(str, str)] = assets_years.map(list_of_dicts)  # type: ignore
    deduplicate = deduplication.partial().expand_kwargs(assets_years)  # type: ignore

    complementary_info = PostgresOperator.partial(
        task_id="complementary_info",
        postgres_conn_id="postgres_conn",  # created as env variable
        sql="sql/GET_INFO_FROM_SYMBOLS_PROVIDERS.sql",
    ).expand(  # type: ignore
        parameters=pg_input
    )

    def list_of_str_int(elem: list[list[str | int]]) -> list[str | int]:
        return [y for x in elem for y in x]

    task_input = complementary_info.output.map(list_of_str_int)

    # save complementary info in csv files for postgres IBKR table compatibility
    @task(trigger_rule="all_success", depends_on_past=False, wait_for_downstream=False)  # type: ignore
    def enrich_csv(extra_info: list[Any]) -> None:
        import pandas as pd

        symbol, symbol_id, provider_id, asset_class_id, frequency_id = (
            extra_info[0],
            extra_info[1],
            extra_info[2],
            extra_info[3],
            extra_info[4],
        )
        print(symbol)
        filepath = BASE_DIR + "/" + symbol.replace("/", "-") + "/final.csv"
        dframe = pd.read_csv(filepath, parse_dates=["time"], index_col="time")  # type: ignore
        print(f"before: {dframe.shape}")
        dframe["provider_id"] = provider_id
        dframe["asset_class_id"] = asset_class_id
        dframe["frequency_id"] = frequency_id
        dframe["symbol_id"] = symbol_id
        print(f"after: {dframe.shape}")

        dframe.to_csv(filepath, header=True)

        print(extra_info)

    enrich = enrich_csv.partial().expand(extra_info=task_input)  # type: ignore

    @task  # type: ignore
    def prepare_input() -> list[str]:
        assets = list(set(os.listdir(BASE_DIR)))
        filepaths: list[str] = []
        for elem in assets:
            filepath = "data_sources/ibkr/" + elem + "/final.csv"
            filepaths.append(filepath)

        return filepaths

    csvpg_input = prepare_input()

    solve_bug = EmptyOperator(task_id="solve_bug")

    # save csv to Postgres database
    kwargs: dict[str, Any] = {
        "task_id": "save_data_to_db",
    }

    # filepath = "data_sources/ibkr/AUD-CAD/final.csv"
    # save_to_db = CsvToPostgres(
    #     filepath=filepath, sql="sql/COPY_INTO_IBKR_DATA.sql", **kwargs
    # )

    save_to_db = CsvToPostgres.partial(  # type: ignore
        sql="sql/COPY_INTO_IBKR_DATA.sql", **kwargs
    ).expand(  # type: ignore
        filepath=csvpg_input
    )
    [deduplicate, complementary_info] >> enrich >> solve_bug >> save_to_db  # type: ignore


populate_db()

@jose-workpath
Copy link
Contributor

jose-workpath commented Apr 4, 2023

I also have this error on Airflow 2.5.3.

I found out that it happens when 2 dynamically mapped tasks have a dependency with each other and the child task of the last mapped task has another parent task. ie in a DAG like this:

graph LR
    A["A (mapped task)"] --> B["B (mapped task)"] --> D
    C --> D
Loading

The mapped task B will always get skipped, and of course any child task of it.

In my case, I can not do what @darkfennertrader suggested because I use the result of the first mapped task as a parameter of the next mapped task.

I created a simple DAG to reproduce the error I'm getting:

"""
Some bug causes a DAG like this to always get skipped
when there are more than 1 expanded task and the downstream
of the last expanded task has another upstream task. ie:

`A (mapped) -> B (mapped) -> D`

and:

`C -> D`

Will immediately cause: `B` and `D` and any downstream of `D` to be Skipped.

In this DAG example you will see how the tasks `make_it_double` (B), `wrap_up (D)` and
any downstream immediately returns a 'skipped' status as soon as the DAG is triggered
and are not scheduled for execution.
"""

import datetime
from typing import List

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.decorators import task


@task
def generate_list_of_numbers() -> List[List[int]]:
    """
    Retrieves a list of length 2, each sublist containing lists of integers of range [1, 3]
    """
    return [list(range(3)) for _ in range(2)]


@task
def a_add_them_together(numbers: List[int]):
    """
    Add the numbers together
    """
    return sum(numbers)


@task
def b_make_it_double(summed: int):
    """
    Multiplies by 2
    """
    return summed * 2


@task
def d_wrap_up(results) -> int:
    """
    Prints the results
    """
    print(results)


@task
def c_some_upstream():
    print("Can be any kind of upstream python task.")


with DAG(
    dag_id="reproduce_skipped_expansions_error",
    schedule_interval=None,
    catchup=False,
    start_date=datetime.datetime(2023, 1, 13),
    dagrun_timeout=datetime.timedelta(minutes=60),
    doc_md=__doc__,
) as dag:

    numbers = generate_list_of_numbers()
    # Expected result: [[1,2,3], [1,2,3]]
    a_added_numbers = a_add_them_together.expand(numbers=numbers)
    # Expected result: [6, 6]
    b_multiplied = b_make_it_double.expand(summed=a_added_numbers)
    # Expected result: [12, 12]
    c_dep = c_some_upstream()
    # Just prints 'multiplied':
    d_wrap = d_wrap_up(results=b_multiplied)
    # Define order of tasks:
    c_dep >> d_wrap
    d_wrap >> EmptyOperator(task_id="any_downstream")

if __name__ == "__main__":
    dag.cli()

@uranusjr
Copy link
Member

uranusjr commented Apr 4, 2023

Thanks for the investigation and the reproduction, that’s great help. I’ll find some time to check if I can find the issue later this week.

@uranusjr uranusjr self-assigned this Apr 4, 2023
@eladkal eladkal added the affected_version:2.5 Issues Reported for 2.5 label Apr 5, 2023
@uranusjr
Copy link
Member

uranusjr commented Apr 10, 2023

First finding: d_wrap is skipped because b_make_it_double is not expanded, and the unexpanded task is marked as skipped. Next I need to find out why b_make_it_double is marked as skipped. This happens surprisingly early in scheduling, even before a_add_them_together is actually run!? And more weird I can yet find what causes b_make_it_double to be skipped (no relevant logs).

@uranusjr
Copy link
Member

Update: My attempt to induce the issue with an extremely simplified scheduler (calling task_instance_scheduling_decisions repeatedly) does not work; things all run as expected. This leads me to guess the issue may be in the mini scheduler, not the main scheduler loop.

@uranusjr
Copy link
Member

Yup, issue’s gone if I set schedule_after_task_execution = false. This should greatly narrow down the possible causes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.5 Issues Reported for 2.5 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants