Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce task queueing latency when using Cosmos #990

Closed
1 task
tatiana opened this issue May 21, 2024 · 11 comments · Fixed by #1014
Closed
1 task

Reduce task queueing latency when using Cosmos #990

tatiana opened this issue May 21, 2024 · 11 comments · Fixed by #1014
Assignees
Labels
area:performance Related to performance, like memory usage, CPU usage, speed, etc area:rendering Related to rendering, like Jinja, Airflow tasks, etc customer request An Astronomer customer made requested this execution:virtualenv Related to Virtualenv execution environment priority:high High priority issues are blocking or critical issues without a workaround and large impact profile:bigquery Related to BigQuery ProfileConfig
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented May 21, 2024

Context

This issue happened before the last release (1.4) - and can also be reproduced with Cosmos 1.4.1.

Users have observed long task queueing times for Cosmos tasks:
Screenshot 2024-05-21 at 11 33 12

This is not observed when using, for instance, BashOperator task instances:
Screenshot 2024-05-21 at 11 33 33

The task queueing time for the Cosmos DAG is consistently 5s, while it is close to 0s for the BashOperator one.

How to reproduce

Example Cosmos DAG:

from datetime import datetime

from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.constants import TestBehavior

from include.constants import jaffle_shop_path, venv_execution_config

dbt_profile_example = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=ProjectConfig(jaffle_shop_path),
    profile_config=ProfileConfig(
        # these map to dbt/jaffle_shop/profiles.yml
        profile_name="airflow_db",
        target_name="bq",
        profiles_yml_filepath=jaffle_shop_path / "profiles.yml",
    ),
    render_config=RenderConfig(test_behavior=TestBehavior.NONE),
    execution_config=venv_execution_config,
    # normal dag parameters
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="bq_profile_example",
    tags=["profiles"],
)

Example BashOperator DAG:

from pendulum import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

from include.constants import jaffle_shop_path as DBT_PROJECT_DIR
from include.constants import dbt_executable

DBT_BIN_PATH = dbt_executable

BASE_DBT_CMD = f"{DBT_BIN_PATH} %s --profiles-dir {DBT_PROJECT_DIR} --project-dir {DBT_PROJECT_DIR} --profile airflow_db --target bq"

DBT_DEPS =  BASE_DBT_CMD % " deps"
DBT_LS = BASE_DBT_CMD % " ls"
BASE_DBT_CMD = DBT_DEPS + " && " + DBT_LS + " && " + BASE_DBT_CMD


with DAG(
    "bash_bq_example",
    start_date=datetime(2020, 12, 23),
    description="A sample Airflow DAG to invoke dbt runs using a BashOperator",
    schedule_interval=None,
    catchup=False,
) as dag:
    raw_payments_seed = BashOperator(
        task_id="raw_payments_seed",
        bash_command=BASE_DBT_CMD % "seed --models raw_payments",
    )
    raw_orders_seed = BashOperator(
        task_id="raw_orders_seed",
        bash_command=BASE_DBT_CMD % "seed --models raw_orders",
    )
    raw_customers_seed = BashOperator(
        task_id="raw_customers_seed",
        bash_command=BASE_DBT_CMD % "seed --models raw_customers",
    )

    run_stg_payments = BashOperator(
        task_id="run_stg_payments",
        bash_command=BASE_DBT_CMD % "run --models stg_payments"
    )

    run_stg_orders = BashOperator(
        task_id="run_stg_orders",
        bash_command=BASE_DBT_CMD % "run --models stg_orders"
    )

    run_stg_customers = BashOperator(
        task_id="run_stg_customers",
        bash_command=BASE_DBT_CMD % "run --models stg_customers"
    )

    run_orders = BashOperator(
        task_id="run_orders",
        bash_command=BASE_DBT_CMD % "run --models orders"
    )

    run_customers = BashOperator(
        task_id="run_customers",
        bash_command=BASE_DBT_CMD % "run --models customers"
    )

    raw_payments_seed >> run_stg_payments
    raw_orders_seed >> run_stg_orders
    raw_customers_seed >> run_stg_customers

    [run_stg_payments, run_stg_orders, run_stg_customers] >> run_customers
    [run_stg_payments, run_stg_orders] >> run_orders

It is expected that some delay will be introduced by Cosmos, but it is too long ATM even for relatively small dbt projects.

Possible solution

Acceptance criteria

  • Reduce the task queueing time when using Cosmos
@tatiana tatiana added customer request An Astronomer customer made requested this priority:high High priority issues are blocking or critical issues without a workaround and large impact area:performance Related to performance, like memory usage, CPU usage, speed, etc area:rendering Related to rendering, like Jinja, Airflow tasks, etc labels May 21, 2024
@tatiana tatiana added this to the Cosmos 1.5.0 milestone May 21, 2024
@tatiana tatiana self-assigned this May 21, 2024
@dosubot dosubot bot added execution:virtualenv Related to Virtualenv execution environment profile:bigquery Related to BigQuery ProfileConfig labels May 21, 2024
@tatiana
Copy link
Collaborator Author

tatiana commented May 21, 2024

Progress can be seen in draft PR: #992

@tatiana
Copy link
Collaborator Author

tatiana commented May 23, 2024

@tatiana
Copy link
Collaborator Author

tatiana commented May 29, 2024

When using the current approach in a distributed environment, there are two challenges:

  1. Concurrent tasks (in the same node) try to create the cache at the same time
  2. Tasks running in different nodes (that didn't have the cache) have to generate it

We'll look into improving this.

Examples of the behaviour in a distributed Airflow environment:

Image
Image

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 3, 2024

To store the dbt ls output as an Airflow Variable seems to be more promising on a distributed Airflow environment, with the caveats described in the ticket description:
#1014

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 5, 2024

Waiting for feedback from end-users on #1014.

Analysed and confirmed the feasibility of this approach for larger dbt projects:

  • Another real dbt project with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed.
  • Maximum cell size in Postgres: 20MB

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 11, 2024

Here are some updates on this task: We received feedback from end-users on #1014, and they were happy with the performance improvements and the overall approach. We monitored the load in their deployment's database, and it was fine, without any significant increases.

The next agreed-upon steps were to rename the variable used to cache to be prefixed with a cosmic identifier and to work on the purging strategy.

We initially implemented CachePurgeConfig in d1a30b6, but we realized that would be more overhead for end-users, which was unnecessary. Initially, there was also the intention to use a hash of DAG file itself to be part of the caching version definition. The users explained they wanted to be able to change other parts of their DAG that would be unrelated to Cosmos dbt ls.

We released astronomer-cosmos==1.5.0a6

https://pypi.org/project/astronomer-cosmos/1.5.0a6/
https://github.com/astronomer/astronomer-cosmos/pull/1014/commits

We addressed the feedback you gave during our Monday session on purging:

The Airflow vars used to cache the dbt ls output are now prefixed with cosmos_cache
If the following arguments are changed, we'll automatically purge the cache:

  • RenderConfig.env_vars
  • RenderConfig.exclude
  • RenderConfig.select
  • RenderConfig.selector
  • ProjectConfig.dbt_vars
  • ProjectConfig.env_vars
  • ProjectConfig.partial_parse

The following argument was introduced in case you'd like to define airflow variables that could be used to purge the cache (it expects a list with Airflow variable names)

  • RenderConfig.airflow_vars_to_purge_cache

What is missing on purging:

  • Take into account "packages.yml" or "dependencies.yml"
  • Change how we consider if the dbt project files changed (before, we were taking into account the modified timestamp, but this does not seem to play well with --dags deployment, since the file timestamps change often)

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 11, 2024

An example that can be tested using 1.5.0a6:

DbtDag(
    # dbt/cosmos-specific parameters
    project_config=ProjectConfig(
        jaffle_shop_path,
        dbt_vars={"some-var": 2}
    ),
    profile_config=ProfileConfig(
        # these map to dbt/jaffle_shop/profiles.yml
        profile_name="airflow_db",
        target_name="bq",
        profiles_yml_filepath=jaffle_shop_path / "profiles.yml",
    ),
    render_config=RenderConfig(
        test_behavior=TestBehavior.NONE,
        env_vars={"PURGE": os.getenv("PURGE", "0")},
        airflow_vars_to_purge_cache=["purge"],
    ),
    execution_config=venv_execution_config,
    # normal dag parameters
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="bq_efficient",
    tags=["profiles"],
)

If the value of dbt_vars changes, the cache for this DAG will be purged.
If the value of the environment variable PURGE is changed, the cache for this DAG will be purged.
If the value for the Airflow var purge is changed, the cache for this DAG will also be purged.

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 11, 2024

Today, I've made a change to the dbt project hash. It's now created using sha256 and not the dbt project files modified timestamp (0e0f663). This change allows the solution to work with --dags only deployment.

I'll also consider the sha256 for "packages.yml" or "dependencies.yml" if they exist, and cut a new alpha released once these changes are implemented.

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 11, 2024

With the latest changes, the overall DAG duration for a jaffle shop DAG dropped from 00:01:03 (already using partial parsing cache) to 00:00:39, with a consistent small task queue time even after --dags deployments.

Before:
Screenshot 2024-06-11 at 23 22 40

After:
Screenshot 2024-06-11 at 23 22 48

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 17, 2024

Some of the last improvements on this workstream:

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 21, 2024

Fixed issues (deterministic cache hash on different VMs) and created tests that cover 100% of this feature.

arojasb3 pushed a commit to arojasb3/astronomer-cosmos that referenced this issue Jul 14, 2024
…le (astronomer#1014)

Improve significantly the `LoadMode.DBT_LS` performance. The example
DAGs tested reduced the task queueing time significantly (from ~30s to
~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s
(by more than 50%). Some users[ reported improvements of
84%](astronomer#1014 (comment))
in the DAG run time when trying out these changes. This difference can
be even more significant on larger dbt projects.

The improvement was accomplished by caching the dbt ls output as an
Airflow Variable. This is an alternative to astronomer#992, when we cached the
pickled DAG/TaskGroup into a local file in the Airflow node. Unlike
astronomer#992, this approach works well for distributed deployments of Airflow.

As with any caching solution, this strategy does not guarantee optimal
performance on every run—whenever the cache is regenerated, the
scheduler or DAG processor will experience a delay. It was also observed
that the key value could change across platforms (e.g., `Darwin` and
`Linux`). Therefore, if using a deployment with heterogeneous OS, the
key may be regenerated often.

Closes: astronomer#990
Closes: astronomer#1061

**Enabling/disabling this feature**

This feature is enabled by default.
Users can disable it by setting the environment variable
`AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0`.

**How the cache is refreshed**

Users can purge or delete the cache via Airflow UI by identifying and
deleting the cache key.

The cache will be automatically refreshed in case any files of the dbt
project change. Changes are calculated using the SHA256 of all the files
in the directory. Initially, this feature was implemented using the
files' modified timestamp, but this did not work well for some Airflow
deployments (e.g., `astro --dags` since the timestamp was changed during
deployments).

Additionally, if any of the following DAG configurations are changed,
we'll automatically purge the cache of the DAGs that use that specific
configuration:
* `ProjectConfig.dbt_vars`
* `ProjectConfig.env_vars`
* `ProjectConfig.partial_parse`
* `RenderConfig.env_vars`
* `RenderConfig.exclude`
* `RenderConfig.select`
* `RenderConfig.selector`

The following argument was introduced in case users would like to define
Airflow variables that could be used to refresh the cache (it expects a
list with Airflow variable names):
* `RenderConfig.airflow_vars_to_purge_cache`

Example:
```
RenderConfig(
    airflow_vars_to_purge_cache==["refresh_cache"]
)
```

**Cache key**

The Airflow variables that represent the dbt ls cache are prefixed by
`cosmos_cache`. When using `DbtDag`, the keys use the DAG name. When
using `DbtTaskGroup`, they consider the TaskGroup and parent task groups
and DAG.

Examples:
1. The `DbtDag` "cosmos_dag" will have the cache represented by
`"cosmos_cache__basic_cosmos_dag"`.
2. The `DbtTaskGroup` "customers" declared inside teh DAG
"basic_cosmos_task_group" will have the cache key
`"cosmos_cache__basic_cosmos_task_group__customers"`.

**Cache value**

The cache values contain a few properties:
- `last_modified` timestamp, represented using the ISO 8601 format.
- `version` is a hash that represents the version of the dbt project and
arguments used to run dbt ls by the time the cache was created
- `dbt_ls_compressed` represents the dbt ls output compressed using zlib
and encoded to base64 to be recorded as a string to the Airflow metadata
database.

Steps used to compress:
```
        compressed_data = zlib.compress(dbt_ls_output.encode("utf-8"))
        encoded_data = base64.b64encode(compressed_data)
        dbt_ls_compressed = encoded_data.decode("utf-8")
```

We are compressing this value because it will be significant for larger
dbt projects, depending on the selectors used, and we wanted this
approach to be safe and not clutter the Airflow metadata database.

Some numbers on the compression
* A dbt project with 100 models can lead to a dbt ls output of 257k
characters when using JSON. Zlib could compress it by 20x.
* Another [real-life dbt
project](https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt?ref_type=heads)
with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed. It
reduces to 489 KB after being compressed using `zlib` and encoded using
`base64` - to 6% of the original size.
* Maximum cell size in Postgres: 20MB

The latency used to compress is in the order of milliseconds, not
interfering in the performance of this solution.

**Future work**

* How this will affect the Airflow db in the long term
* How does this performance compare to `ObjectStorage`?

**Example of results before and after this change**

Task queue times in Astro before the change:
<img width="1488" alt="Screenshot 2024-06-03 at 11 15 26"
src="https://github.com/astronomer/astronomer-cosmos/assets/272048/20f6ae8f-02e0-4974-b445-740925ab1b3c">

Task queue times in Astro after the change on the second run of the DAG:
<img width="1624" alt="Screenshot 2024-06-03 at 11 15 44"
src="https://github.com/astronomer/astronomer-cosmos/assets/272048/c7b8a821-8751-4d2c-8feb-1d0c9bbba97e">

This feature will be available in `astronomer-cosmos==1.5.0a8`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:performance Related to performance, like memory usage, CPU usage, speed, etc area:rendering Related to rendering, like Jinja, Airflow tasks, etc customer request An Astronomer customer made requested this execution:virtualenv Related to Virtualenv execution environment priority:high High priority issues are blocking or critical issues without a workaround and large impact profile:bigquery Related to BigQuery ProfileConfig
Projects
None yet
1 participant