Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python Virtualenv Operator Caching #33355

Merged
merged 22 commits into from Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dbd7701
Extended PythonVirtualEnvOperator for extra index URL
jscheffl Aug 8, 2023
9d54c5b
Extended PythonVirtualEnvOperator for venv caching
jscheffl Aug 8, 2023
12958a6
Extended PythonVirtualEnvOperator for venv caching
jscheffl Aug 12, 2023
fd7509a
Revert BranchPythonVirtualenvOperator
jscheffl Aug 12, 2023
3e59eb0
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Aug 15, 2023
9bb4371
Add exception and cleanup if venv setup fails
jscheffl Aug 15, 2023
b51f360
Add marker to cached venv to detect partial installs
jscheffl Aug 15, 2023
0ade6ec
Merge branch 'main' into feature/add-venv-operator-caching
jscheffl Aug 15, 2023
6de9c75
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 8, 2023
f802db1
Add usage notes to cache documentation as discussed in review
jscheffl Sep 8, 2023
f799ff1
Add a more intense dependency check of cache key calculation and add …
jscheffl Sep 8, 2023
9a78ff8
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 11, 2023
01d62a5
Apply review feedback, rename to virtual environment and cache parame…
jscheffl Sep 12, 2023
e8d58c8
Apply review feedback, rename to virtual environment
jscheffl Sep 12, 2023
4c83527
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 25, 2023
91d3967
Apply review feedback, marker hash content is validated
jscheffl Sep 25, 2023
9ac4ed2
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 26, 2023
0ece29e
Code review findings
jscheffl Sep 26, 2023
6a7120d
Merge branch 'main' into feature/add-venv-operator-caching
jscheffl Sep 26, 2023
a23056f
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 28, 2023
06e6f87
Merge branch 'main' into feature/add-venv-operator-caching
jscheffl Sep 29, 2023
dd40c7e
Merge branch 'main' into feature/add-venv-operator-caching
jscheffl Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/decorators/__init__.pyi
Expand Up @@ -110,6 +110,7 @@ class TaskDecoratorCollection:
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> TaskDecorator:
Expand All @@ -134,6 +135,9 @@ class TaskDecoratorCollection:
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:param venv_cache_path: Optional path to the venv parent folder in which the venv will be cached,
creates a sub-folder venv-{hash} whereas hash will be replaced with a checksum of requirements.
If not provided the venv will be created and deleted in a temp folder for every execution.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand Down
61 changes: 61 additions & 0 deletions airflow/operators/python.py
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import fcntl
import importlib
import inspect
import logging
Expand Down Expand Up @@ -46,6 +47,8 @@
from airflow.models.baseoperator import BaseOperator
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.models.variable import Variable
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
Expand Down Expand Up @@ -523,6 +526,9 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:param venv_cache_path: Optional path to the venv parent folder in which the venv will be cached,
creates a sub-folder venv-{hash} whereas hash will be replaced with a checksum of requirements.
If not provided the venv will be created and deleted in a temp folder for every execution.
"""

template_fields: Sequence[str] = tuple({"requirements"} | set(PythonOperator.template_fields))
Expand All @@ -545,6 +551,7 @@ def __init__(
expect_airflow: bool = True,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
jscheffl marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
if (
Expand Down Expand Up @@ -574,6 +581,7 @@ def __init__(
self.index_urls = list(index_urls)
else:
self.index_urls = None
self.venv_cache_path = Path(venv_cache_path) if venv_cache_path else None
super().__init__(
python_callable=python_callable,
use_dill=use_dill,
Expand Down Expand Up @@ -608,7 +616,60 @@ def _prepare_venv(self, venv_path: Path) -> None:
index_urls=self.index_urls,
)

def _calculate_cache_hash(self) -> str:
jscheffl marked this conversation as resolved.
Show resolved Hide resolved
"""Helper to generate the hash of the cache folder to use.

The following factors are used as input for the hash:
- (sorted) list of requirements
- pip install options
- flag of system site packages
- python version
- Variable to override the hash with a cache key
- Index URLs
"""
requirements_list = ",".join(self._requirements_list())
pip_options = ",".join(self.pip_install_options) if self.pip_install_options else ""
index_urls = ",".join(self.index_urls) if self.index_urls else ""
cache_key = str(Variable.get("PythonVirtualenvOperator.cache_key", ""))
hash_text = (
f"{self.python_version};{requirements_list};{cache_key};{self.system_site_packages};{pip_options};"
f"{index_urls}"
)
hash_object = hashlib_wrapper.md5(hash_text.encode())
requirements_hash = hash_object.hexdigest()
return requirements_hash[0:8]

def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path:
"""Helper to ensure a valid venv is set up and will create inplace."""
venv_path = venv_cache_path / f"venv-{self._calculate_cache_hash()}"
self.log.info("Python Virtualenv will be cached in %s", venv_path)
venv_path.parent.mkdir(parents=True, exist_ok=True)
with open(f"{venv_path}.lock", "w") as f:
# Ensure that cache is not build by parallel workers
fcntl.flock(f, fcntl.LOCK_EX)

if venv_path.exists() and (venv_path / "install_complete_marker").exists():
self.log.info("Re-using cached Python Virtualenv in %s", venv_path)
potiuk marked this conversation as resolved.
Show resolved Hide resolved
else:
try:
if venv_path.exists():
self.log.warning("Found a previous partial venv in %s", venv_path)
shutil.rmtree(venv_path)
venv_path.mkdir(parents=True)
self._prepare_venv(venv_path)
(venv_path / "install_complete_marker").touch()
except Exception as e:
shutil.rmtree(venv_path)
raise AirflowException(f"Unable to create new virtualenv in {venv_path}") from e
self.log.info("New Python Virtualenv created in %s", venv_path)
return venv_path

def execute_callable(self):
if self.venv_cache_path:
venv_path = self._ensure_venv_cache_exists(self.venv_cache_path)
python_path = venv_path / "bin" / "python"
return self._execute_python_callable_in_subprocess(python_path, venv_path)

with TemporaryDirectory(prefix="venv") as tmp_dir:
tmp_path = Path(tmp_dir)
self._prepare_venv(tmp_path)
Expand Down
21 changes: 21 additions & 0 deletions docs/apache-airflow/howto/operator/python.rst
Expand Up @@ -126,6 +126,27 @@ To prevent adding secrets to the private repository in your DAG code you can use
In the special case you want to prevent remote calls for setup of a virtualenv, pass the ``index_urls`` as empty list as ``index_urls=[]`` which
forced pip installer to use the ``--no-index`` option.

Caching and re-use
^^^^^^^^^^^^^^^^^^

Setup of virtualenvs is made per task execution in a temporary directory. After execution the virtualenv is deleted again. Ensure that the ``$tmp`` folder
on your workers have sufficient disk space. Usually (if not configured differently) the local pip cache will be used preventing a re-download of packages
for each execution.

But still setting up the virtualenv for every execution needs some time. For repeated execution you can set the option ``venv_cache_path`` to a file system
folder on your worker. In this case the virtualenv will be set up once and be re-used. If venv caching is used, per unique requirements set different
virtualenv subfolders are created in the cache path. So depending on your variations in the DAGs in your system setup sufficient disk space is needed.

Note that no automated cleanup is made and in case of cached mode. All worker slots share the same virtualenv but if tasks are scheduled over and over on
different workers, it might happen that venvs are created on multiple workers individually. Also if the worker is started in a Kubernetes POD, a restart
of the worker will drop the cache (assuming ``venv_cache_path`` is not on a persistent volume).

In case you have problems during runtime with broken cached virtualenvs, you can influence the cache directory hash by setting the Airflow variable
``PythonVirtualenvOperator.cache_key`` to any text. The content of this variable is uses in the vector to calculate the cache directory key.

Note that any modification of a cached venv (like temp files in binary path, post-installing further requirements) might pollute a cached venv and the
operator is not maintaining or cleaning the cache path.


.. _howto/operator:ExternalPythonOperator:

Expand Down
11 changes: 11 additions & 0 deletions tests/operators/test_python.py
Expand Up @@ -26,6 +26,7 @@
from collections import namedtuple
from datetime import date, datetime, timedelta
from subprocess import CalledProcessError
from tempfile import TemporaryDirectory
from typing import Generator
from unittest import mock
from unittest.mock import MagicMock
Expand Down Expand Up @@ -984,6 +985,16 @@ def f(a):

self.run_as_task(f, index_urls=["https://abc.def.de", "http://xyz.abc.de"], op_args=[4])

def test_caching(self):
def f(a):
import sys

assert "pytest_venv_1234" in sys.executable
return a

with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
self.run_as_task(f, venv_cache_path=tmp_dir, op_args=[4])

# This tests might take longer than default 60 seconds as it is serializing a lot of
# context using dill (which is slow apparently).
@pytest.mark.execution_timeout(120)
Expand Down