Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up LoadMode.DBT_LS by caching dbt ls output in Airflow Variable #1014

Merged
merged 84 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
0ea8c19
Add more logs to confirm bottleneck
tatiana May 31, 2024
71dca3c
Support retrieving dbt ls cache from Airflow Variable
tatiana Jun 3, 2024
c46e8b1
Release 1.5.0a3
tatiana Jun 3, 2024
240b03c
Fix log message for TaskGroups
tatiana Jun 3, 2024
ed8b6fe
Fix log for Cosmos TaskGroup
tatiana Jun 3, 2024
5dc96af
Update changelog after last rebase
tatiana Jun 10, 2024
4e0b71d
Introduce CachePurgeConfig
tatiana Jun 10, 2024
5602e9b
Update release notes after rebase
tatiana Jun 10, 2024
fb3c99c
Update release notes after rebase
tatiana Jun 10, 2024
4768ea7
Fix alpha release version
tatiana Jun 10, 2024
b5b0aff
Improve var caching to be prefixed with cosmos_cache
tatiana Jun 10, 2024
9d36171
Temporarily remove fallback to custom load method in automatic, to tr…
tatiana Jun 10, 2024
94f82a3
Remove CachePurgeConfig and purge based on commands passed to dbt ls
tatiana Jun 11, 2024
e4ece12
Add RenderConfig.airflow_vars_to_purge_cache
tatiana Jun 11, 2024
625d980
Release 1.5.0a6
tatiana Jun 11, 2024
f29efc5
Change identifying dbt project changes based on md5
tatiana Jun 11, 2024
955c7b5
Use cached_property
tatiana Jun 11, 2024
c0efbd3
Use sha256 to hash the dir
tatiana Jun 11, 2024
d93381f
Add logs
tatiana Jun 11, 2024
0fe9de7
Fix logs
tatiana Jun 11, 2024
b298a27
Release 1.5.0a7
tatiana Jun 12, 2024
913e810
Add docstrings and rename args
tatiana Jun 12, 2024
1446987
Remove stale comment
tatiana Jun 14, 2024
1f6f62b
Fix behaviour when user passes ProjectConfig(dbt_vars) in dbt ls
tatiana Jun 14, 2024
c19b80b
Add more tests
tatiana Jun 14, 2024
c811d23
Add more tests
tatiana Jun 14, 2024
3e67d3a
Add more tests
tatiana Jun 14, 2024
72b39b3
Fix unittests that were trying to fetch Airflow vars
tatiana Jun 14, 2024
125d180
Try to fix test so it passes in linux and macos
tatiana Jun 17, 2024
840a85c
Fix dbt graph test/--vars
tatiana Jun 17, 2024
bb9eca1
Fix test passing vars
tatiana Jun 17, 2024
d73a8d2
Fix test graph
tatiana Jun 17, 2024
a830135
Fix CI integrationt ests issue
tatiana Jun 17, 2024
f9571a1
Fix issue uncovered by sqlite test
tatiana Jun 17, 2024
7624076
Change tests that should not be using cache
tatiana Jun 18, 2024
20b636b
Change from sha256 to md5 to see if it is more deterministic
tatiana Jun 18, 2024
5e8daf5
Try to get deterministic value on hash for dbt project dir
tatiana Jun 18, 2024
624d9e8
Fix hash value in linux
tatiana Jun 18, 2024
9b27522
Try to fix CI unittests
tatiana Jun 18, 2024
d860a6c
fix unittest in the CI
tatiana Jun 18, 2024
763036f
fix unittest in the CI
tatiana Jun 18, 2024
0168516
Split test by linux/darwin
tatiana Jun 18, 2024
04a4ab7
Fix test_load_via_dbt_ls_caching_partial_parsing
tatiana Jun 18, 2024
64b9ea4
Fix pyproject to fix integration tests
tatiana Jun 18, 2024
cf81a86
troubleshoot
tatiana Jun 18, 2024
50f2c10
Troubleshoot CI
tatiana Jun 18, 2024
2bd7e50
Troubleshoot CI
tatiana Jun 18, 2024
23c1bc8
Attempts to get tests to pass
tatiana Jun 18, 2024
e24576f
Fix ci yml
tatiana Jun 18, 2024
98232f7
Disable 2.9 tests
tatiana Jun 18, 2024
5e53a4a
Reduce test matrix to help troubleshooting
tatiana Jun 18, 2024
37264d8
Rename connection used in example dags
tatiana Jun 18, 2024
26207ae
Try to reset Airflow db
tatiana Jun 19, 2024
a3b0656
Force airflow db to be reset
tatiana Jun 19, 2024
935cf51
Run a subset of integration tests
tatiana Jun 19, 2024
890ed11
Remove CI cache on int tests
tatiana Jun 19, 2024
afd3dbd
Try to renable all integration tests
tatiana Jun 19, 2024
aaa3a74
Try to fix integration tests
tatiana Jun 19, 2024
36e38a3
Remove incorrrectly commited dev/dags/dbt/perf/models/
tatiana Jun 20, 2024
a69542c
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Jun 20, 2024
6c9bc63
Fix version of Airflow to supported version
tatiana Jun 21, 2024
b3c3716
Improve and tidy up tests
tatiana Jun 21, 2024
254d276
Revert changes to workflows test
tatiana Jun 21, 2024
9767c95
Introduce method to delete stale Cosmos cache Airflow Variables
tatiana Jun 22, 2024
9bf1898
Release 1.5.0a8
tatiana Jun 22, 2024
859a257
Change process_time to perf_counter
tatiana Jun 24, 2024
b9297c0
Address PR feedback
tatiana Jun 24, 2024
16d288c
Address PR feedback
tatiana Jun 24, 2024
65a9f07
Address PR feedback
tatiana Jun 24, 2024
b362afe
Address PR feedback
tatiana Jun 24, 2024
56eeaba
Add an example DAG to cleanup Cosmos cache originated from running db…
tatiana Jun 24, 2024
d0c323d
Make calculate_dbt_ls_cache_current_version private
tatiana Jun 25, 2024
ea452f7
Add debug statement for dbt_ls_cache_key_args
tatiana Jun 25, 2024
7ba3b63
Use timezone.utc as opposed to pytz
tatiana Jun 25, 2024
97dddba
Change logger.warn to logger.info
tatiana Jun 25, 2024
3cb6bed
Remove debug file
tatiana Jun 25, 2024
dae75fd
Add tests to cover delete_unused_dbt_ls_cache
tatiana Jun 25, 2024
8ccf6ef
Rename int test to int
tatiana Jun 25, 2024
908a48e
Add docs
tatiana Jun 25, 2024
00eff69
Add docs
tatiana Jun 25, 2024
b6a9077
Improve docs, spelling, grammar
tatiana Jun 25, 2024
cedb178
Improve docs
tatiana Jun 25, 2024
46a9b78
Release 1.5.0a9
tatiana Jun 25, 2024
4dd9549
Update changelog
tatiana Jun 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jobs:
- uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}

- uses: actions/cache@v3
with:
path: |
Expand All @@ -139,6 +140,7 @@ jobs:
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
DATABRICKS_HOST: mock
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ repos:
types-PyYAML,
types-attrs,
attrs,
types-pytz,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
types-pytz,

Would we still need it now?

types-requests,
types-python-dateutil,
apache-airflow,
Expand Down
77 changes: 76 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,83 @@
Changelog
=========

1.5.0a9 (2024-06-25)
--------------------

New Features

* Speed up ``LoadMode.DBT_LS`` by caching dbt ls output in Airflow Variable by @tatiana in #1014
* Support for running dbt tasks in AWS EKS in #944 by @VolkerSchiewe
* Add Clickhouse profile mapping by @roadan and @pankajastro in #353 and #1016
* Add node config to TaskInstance Context by @linchun3 in #1044

Bug fixes

* Fix disk permission error in restricted env by @pankajastro in #1051
* Add CSP header to iframe contents by @dwreeves in #1055
* Stop attaching log adaptors to root logger to reduce logging costs by @glebkrapivin in #1047

Enhancements

* Support ``static_index.html`` docs by @dwreeves in #999
* Support deep linking dbt docs via Airflow UI by @dwreeves in #1038
* Add ability to specify host/port for Snowflake connection by @whummer in #1063

Others

* Update documentation for DbtDocs generator by @arjunanan6 in #1043
* Use uv in CI by @dwreeves in #1013
* Cache hatch folder in the CI by @tatiana in #1056
* Change example DAGs to use ``example_conn`` as opposed to ``airflow_db`` by @tatiana in #1054
* Mark plugin integration tests as integration by @tatiana in #1057
* Ensure compliance with linting rule D300 by using triple quotes for docstrings by @pankajastro in #1049
* Pre-commit hook updates in #1039, #1050, #1064


1.4.3 (2024-06-07)
-----------------
------------------

Bug fixes

* Bring back ``dataset`` as a required field for BigQuery profile by @pankajkoti in #1033

Enhancements

* Only run ``dbt deps`` when there are dependencies by @tatiana and @AlgirdasDubickas in #1030

Docs

* Fix docs so it does not reference non-existing ``get_dbt_dataset`` by @tatiana in #1034


v1.4.2 (2024-06-06)
-------------------

Bug fixes

* Fix the invocation mode for ``ExecutionMode.VIRTUALENV`` by @marco9663 in #1023
* Fix Cosmos ``enable_cache`` setting by @tatiana in #1025
* Make ``GoogleCloudServiceAccountDictProfileMapping`` dataset profile arg optional by @oliverrmaa and @pankajastro in #839 and #1017
* Athena profile mapping set ``aws_session_token`` in profile only if it exists by @pankajastro in #1022

Others

* Update dbt and Airflow conflicts matrix by @tatiana in #1026
* Enable Python 3.12 unittest by @pankajastro in #1018
* Improve error logging in ``DbtLocalBaseOperator`` by @davidsteinar in #1004
* Add GitHub issue templates for bug reports and feature request by @pankajkoti in #1009
* Add more fields in bug template to reduce turnaround in issue triaging by @pankajkoti in #1027
* Fix ``dev/Dockerfile`` + Add ``uv pip install`` for faster build time by @dwreeves in #997
* Drop support for Airflow 2.3 by @pankajkoti in #994
* Update Astro Runtime image by @RNHTTR in #988 and #989
* Enable ruff F linting by @pankajastro in #985
* Move Cosmos Airflow configuration to settings.py by @pankajastro in #975
* Fix CI Issues by @tatiana in #1005
* Pre-commit hook updates in #1000, #1019


1.4.1 (2024-05-17)
------------------

Bug fixes

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

Contains dags, task groups, and operators.
"""
__version__ = "1.4.3"
__version__ = "1.5.0a9"


from cosmos.airflow.dag import DbtDag
Expand Down
193 changes: 184 additions & 9 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,47 @@
from __future__ import annotations

import functools
import hashlib
import json
import os
import shutil
import time
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path

import msgpack
from airflow.models import DagRun, Variable
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
from airflow.utils.task_group import TaskGroup
from sqlalchemy import select
from sqlalchemy.orm import Session

from cosmos import settings
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.dbt.project import get_partial_parse_path
from cosmos.log import get_logger

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"


def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]:
dag_id = None
task_group_id = None
cosmos_type = "DbtDag"

if task_group:
if task_group.dag_id is not None:
dag_id = task_group.dag_id
if task_group.group_id is not None:
task_group_id = task_group.group_id
cosmos_type = "DbtTaskGroup"
else:
dag_id = dag.dag_id

return {"cosmos_type": cosmos_type, "dag_id": dag_id, "task_group_id": task_group_id}


# It was considered to create a cache identifier based on the dbt project path, as opposed
Expand All @@ -28,16 +57,21 @@ def _create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str:
:param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached
:return: Unique identifier representing the cache
"""
if task_group:
if task_group.dag_id is not None:
cache_identifiers_list = [task_group.dag_id]
if task_group.group_id is not None:
cache_identifiers_list.extend([task_group.group_id.replace(".", "__")])
cache_identifier = "__".join(cache_identifiers_list)
else:
cache_identifier = dag.dag_id
metadata = _get_airflow_metadata(dag, task_group)
cache_identifiers_list = []
dag_id = metadata.get("dag_id")
task_group_id = metadata.get("task_group_id")

if dag_id:
cache_identifiers_list.append(dag_id)
if task_group_id:
cache_identifiers_list.append(task_group_id.replace(".", "__"))

return cache_identifier
return "__".join(cache_identifiers_list)


def create_cache_key(cache_identifier: str) -> str:
return f"{VAR_KEY_CACHE_PREFIX}{cache_identifier}"


def _obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path:
Expand Down Expand Up @@ -171,3 +205,144 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P

if source_manifest_filepath.exists():
shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath))


def _create_folder_version_hash(dir_path: Path) -> str:
"""
Given a directory, iterate through its content and create a hash that will change in case the
contents of the directory change. The value should not change if the values of the directory do not change, even if
the command is run from different Airflow instances.

This method output must be concise and it currently changes based on operating system.
"""
# This approach is less efficient than using modified time
# sum([path.stat().st_mtime for path in dir_path.glob("**/*")])
# unfortunately, the modified time approach does not work well for dag-only deployments
# where DAGs are constantly synced to the deployed Airflow
# for 5k files, this seems to take 0.14
hasher = hashlib.md5()
filepaths = []

for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, filepath) for filepath in files]
filepaths.extend(paths)

for filepath in sorted(filepaths):
with open(str(filepath), "rb") as fp:
buf = fp.read()
hasher.update(buf)

return hasher.hexdigest()


def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: Path, cmd_args: list[str]) -> str:
"""
Taking into account the project directory contents and the command arguments, calculate the
hash that represents the "dbt ls" command version - to be used to decide if the cache should be refreshed or not.

:param cache_identifier: Unique identifier of the cache (may include DbtDag or DbtTaskGroup information)
:param project_path: Path to the target dbt project directory
:param cmd_args: List containing the arguments passed to the dbt ls command that would affect its output
"""
start_time = time.perf_counter()

# Combined value for when the dbt project directory files were last modified
# This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder)
dbt_project_hash = _create_folder_version_hash(project_dir)

# The performance for the following will depend on the user's configuration
hash_args = hashlib.md5("".join(cmd_args).encode()).hexdigest()

elapsed_time = time.perf_counter() - start_time
logger.info(
f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}"
)
return f"{dbt_project_hash},{hash_args}"


@functools.lru_cache
def was_project_modified(previous_version: str, current_version: str) -> bool:
"""
Given the cache version of a project and the latest version of the project,
decides if the project was modified or not.
"""
return previous_version != current_version


@provide_session
def delete_unused_dbt_ls_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete Cosmos cache stored in Airflow Variables based on the last execution of their associated DAGs.

Example usage:

There are three Cosmos cache Airflow Variables:
1. ``cache cosmos_cache__basic_cosmos_dag``
2. ``cosmos_cache__basic_cosmos_task_group__orders``
3. ``cosmos_cache__basic_cosmos_task_group__customers``

The first relates to the ``DbtDag`` ``basic_cosmos_dag`` and the two last ones relate to the DAG
``basic_cosmos_task_group`` that has two ``DbtTaskGroups``: ``orders`` and ``customers``.

Let's assume the last DAG run of ``basic_cosmos_dag`` was a week ago and the last DAG run of
``basic_cosmos_task_group`` was an hour ago.

To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 5 days ago:

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5))
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag

To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 minutes ago:

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(minutes=10))
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders

To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 days ago

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=10))

In this last example, nothing is deleted.
"""
if session is None:
return 0

logger.info(f"Delete the Cosmos cache stored in Airflow Variables that hasn't been used for {max_age_last_usage}")
cosmos_dags_ids = defaultdict(list)
all_variables = session.scalars(select(Variable)).all()
total_cosmos_variables = 0
deleted_cosmos_variables = 0

# Identify Cosmos-related cache in Airflow variables
for var in all_variables:
if var.key.startswith(VAR_KEY_CACHE_PREFIX):
var_value = json.loads(var.val)
cosmos_dags_ids[var_value["dag_id"]].append(var.key)
total_cosmos_variables += 1

# Delete DAGs that have not been run in the last X time
for dag_id, vars_keys in cosmos_dags_ids.items():
last_dag_run = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.execution_date.desc())
.first()
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for var_key in vars_keys:
logger.info(f"Removing the dbt ls cache {var_key}")
Variable.delete(var_key)
deleted_cosmos_variables += 1

logger.info(
f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. "
)
return deleted_cosmos_variables
1 change: 1 addition & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class RenderConfig:
dbt_ls_path: Path | None = None
project_path: Path | None = field(init=False)
enable_mock_profile: bool = True
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class LoadMode(Enum):
CUSTOM = "custom"
DBT_LS = "dbt_ls"
DBT_LS_FILE = "dbt_ls_file"
DBT_LS_CACHE = "dbt_ls_cache"
DBT_MANIFEST = "dbt_manifest"


Expand Down
Loading
Loading