Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python Virtualenv Operator Caching #33355

Merged
merged 22 commits into from Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dbd7701
Extended PythonVirtualEnvOperator for extra index URL
jscheffl Aug 8, 2023
9d54c5b
Extended PythonVirtualEnvOperator for venv caching
jscheffl Aug 8, 2023
12958a6
Extended PythonVirtualEnvOperator for venv caching
jscheffl Aug 12, 2023
fd7509a
Revert BranchPythonVirtualenvOperator
jscheffl Aug 12, 2023
3e59eb0
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Aug 15, 2023
9bb4371
Add exception and cleanup if venv setup fails
jscheffl Aug 15, 2023
b51f360
Add marker to cached venv to detect partial installs
jscheffl Aug 15, 2023
0ade6ec
Merge branch 'main' into feature/add-venv-operator-caching
jscheffl Aug 15, 2023
6de9c75
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 8, 2023
f802db1
Add usage notes to cache documentation as discussed in review
jscheffl Sep 8, 2023
f799ff1
Add a more intense dependency check of cache key calculation and add …
jscheffl Sep 8, 2023
9a78ff8
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 11, 2023
01d62a5
Apply review feedback, rename to virtual environment and cache parame…
jscheffl Sep 12, 2023
e8d58c8
Apply review feedback, rename to virtual environment
jscheffl Sep 12, 2023
4c83527
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 25, 2023
91d3967
Apply review feedback, marker hash content is validated
jscheffl Sep 25, 2023
9ac4ed2
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 26, 2023
0ece29e
Code review findings
jscheffl Sep 26, 2023
6a7120d
Merge branch 'main' into feature/add-venv-operator-caching
jscheffl Sep 26, 2023
a23056f
Merge remote-tracking branch 'origin/main' into feature/add-venv-oper…
jscheffl Sep 28, 2023
06e6f87
Merge branch 'main' into feature/add-venv-operator-caching
jscheffl Sep 29, 2023
dd40c7e
Merge branch 'main' into feature/add-venv-operator-caching
jscheffl Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions airflow/decorators/__init__.pyi
Expand Up @@ -110,6 +110,7 @@ class TaskDecoratorCollection:
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> TaskDecorator:
Expand All @@ -119,13 +120,13 @@ class TaskDecoratorCollection:
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param requirements: Either a list of requirement strings, or a (templated)
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtualenv with. Note that
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtualenv.
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
:param pip_install_options: a list of pip install options when installing requirements
See 'pip install -h' for available options
Expand All @@ -134,6 +135,10 @@ class TaskDecoratorCollection:
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:param venv_cache_path: Optional path to the virtual environment parent folder in which the
virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be
replaced with a checksum of requirements. If not provided the virtual environment will be
created and deleted in a temp folder for every execution.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -160,7 +165,7 @@ class TaskDecoratorCollection:
"""Create a decorator to convert the decorated callable to a virtual environment task.

:param python: Full path string (file-system specific) that points to a Python binary inside
a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path
a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
Expand Down
131 changes: 110 additions & 21 deletions airflow/operators/python.py
Expand Up @@ -17,8 +17,10 @@
# under the License.
from __future__ import annotations

import fcntl
import importlib
import inspect
import json
import logging
import os
import pickle
Expand Down Expand Up @@ -46,7 +48,9 @@
from airflow.models.baseoperator import BaseOperator
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.models.variable import Variable
from airflow.operators.branch import BranchMixIn
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
Expand Down Expand Up @@ -489,16 +493,16 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
:ref:`howto/operator:PythonVirtualenvOperator`

:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtualenv
defined with def, which will be run in a virtual environment.
:param requirements: Either a list of requirement strings, or a (templated)
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtualenv with. Note that
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtualenv.
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
:param pip_install_options: a list of pip install options when installing requirements
See 'pip install -h' for available options
Expand All @@ -521,9 +525,15 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:param venv_cache_path: Optional path to the virtual environment parent folder in which the
virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced
with a checksum of requirements. If not provided the virtual environment will be created and deleted
in a temp folder for every execution.
"""

template_fields: Sequence[str] = tuple({"requirements"} | set(PythonOperator.template_fields))
template_fields: Sequence[str] = tuple(
{"requirements", "index_urls", "venv_cache_path"}.union(PythonOperator.template_fields)
)
template_ext: Sequence[str] = (".txt",)

def __init__(
Expand All @@ -543,6 +553,7 @@ def __init__(
expect_airflow: bool = True,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | os.PathLike[str] = None,
**kwargs,
):
if (
Expand All @@ -553,7 +564,7 @@ def __init__(
raise AirflowException(
"Passing op_args or op_kwargs is not supported across different Python "
"major versions for PythonVirtualenvOperator. Please use string_args."
f"Sys version: {sys.version_info}. Venv version: {python_version}"
f"Sys version: {sys.version_info}. Virtual environment version: {python_version}"
)
if python_version is not None and not isinstance(python_version, str):
warnings.warn(
Expand All @@ -579,6 +590,7 @@ def __init__(
self.index_urls = list(index_urls)
else:
self.index_urls = None
self.venv_cache_path = venv_cache_path
super().__init__(
python_callable=python_callable,
use_dill=use_dill,
Expand All @@ -593,15 +605,15 @@ def __init__(
)

def _requirements_list(self) -> list[str]:
"""Prepare a list of requirements that need to be installed for the venv."""
"""Prepare a list of requirements that need to be installed for the virtual environment."""
requirements = [str(dependency) for dependency in self.requirements]
if not self.system_site_packages and self.use_dill and "dill" not in requirements:
requirements.append("dill")
requirements.sort() # Ensure a hash is stable
return requirements

def _prepare_venv(self, venv_path: Path) -> None:
"""Prepare the requirements and installs the venv."""
"""Prepare the requirements and installs the virtual environment."""
requirements_file = venv_path / "requirements.txt"
requirements_file.write_text("\n".join(self._requirements_list()))
prepare_virtualenv(
Expand All @@ -613,7 +625,83 @@ def _prepare_venv(self, venv_path: Path) -> None:
index_urls=self.index_urls,
)

def _calculate_cache_hash(self) -> tuple[str, str]:
"""Helper to generate the hash of the cache folder to use.

The following factors are used as input for the hash:
- (sorted) list of requirements
- pip install options
- flag of system site packages
- python version
- Variable to override the hash with a cache key
- Index URLs

Returns a hash and the data dict which is the base for the hash as text.
"""
hash_dict = {
"requirements_list": self._requirements_list(),
"pip_install_options": self.pip_install_options,
"index_urls": self.index_urls,
"cache_key": str(Variable.get("PythonVirtualenvOperator.cache_key", "")),
"python_version": self.python_version,
"system_site_packages": self.system_site_packages,
}
hash_text = json.dumps(hash_dict, sort_keys=True)
hash_object = hashlib_wrapper.md5(hash_text.encode())
requirements_hash = hash_object.hexdigest()
return requirements_hash[:8], hash_text

def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path:
"""Helper to ensure a valid virtual environment is set up and will create inplace."""
cache_hash, hash_data = self._calculate_cache_hash()
venv_path = venv_cache_path / f"venv-{cache_hash}"
self.log.info("Python virtual environment will be cached in %s", venv_path)
venv_path.parent.mkdir(parents=True, exist_ok=True)
with open(f"{venv_path}.lock", "w") as f:
# Ensure that cache is not build by parallel workers
fcntl.flock(f, fcntl.LOCK_EX)

hash_marker = venv_path / "install_complete_marker.json"
try:
if venv_path.exists():
if hash_marker.exists():
previous_hash_data = hash_marker.read_text(encoding="utf8")
if previous_hash_data == hash_data:
self.log.info("Re-using cached Python virtual environment in %s", venv_path)
return venv_path

self.log.error(
"Unicorn alert: Found a previous virtual environment in %s "
"with the same hash but different parameters. Previous setup: '%s' / "
"Requested venv setup: '%s'. Please report a bug to airflow!",
venv_path,
previous_hash_data,
hash_data,
)
else:
self.log.warning(
"Found a previous (probably partial installed) virtual environment in %s, "
"deleting and re-creating.",
venv_path,
)

shutil.rmtree(venv_path)

venv_path.mkdir(parents=True)
self._prepare_venv(venv_path)
jscheffl marked this conversation as resolved.
Show resolved Hide resolved
hash_marker.write_text(hash_data, encoding="utf8")
except Exception as e:
shutil.rmtree(venv_path)
raise AirflowException(f"Unable to create new virtual environment in {venv_path}") from e
self.log.info("New Python virtual environment created in %s", venv_path)
return venv_path

def execute_callable(self):
if self.venv_cache_path:
venv_path = self._ensure_venv_cache_exists(Path(self.venv_cache_path))
python_path = venv_path / "bin" / "python"
return self._execute_python_callable_in_subprocess(python_path, venv_path)

with TemporaryDirectory(prefix="venv") as tmp_dir:
tmp_path = Path(tmp_dir)
self._prepare_venv(tmp_path)
Expand All @@ -632,7 +720,7 @@ def _iter_serializable_context_keys(self):

class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn):
"""
A workflow can "branch" or follow a path after the execution of this task in a virtualenv.
A workflow can "branch" or follow a path after the execution of this task in a virtual environment.

It derives the PythonVirtualenvOperator and expects a Python function that returns
a single task_id or list of task_ids to follow. The task_id(s) returned
Expand All @@ -651,15 +739,15 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
"""
Run a function in a virtualenv that is not re-created.

Reused as is without the overhead of creating the virtualenv (with certain caveats).
Reused as is without the overhead of creating the virtual environment (with certain caveats).

The function must be defined using def, and not be
part of a class. All imports must happen inside the function
and no variables outside the scope may be referenced. A global scope
variable named virtualenv_string_args will be available (populated by
string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
can use a return value.
Note that if your virtualenv runs in a different Python major version than Airflow,
Note that if your virtual environment runs in a different Python major version than Airflow,
you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
Airflow through plugins. You can use string_args though.

Expand All @@ -671,13 +759,13 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
:ref:`howto/operator:ExternalPythonOperator`

:param python: Full path string (file-system specific) that points to a Python binary inside
a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path
a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtualenv
defined with def, which will be run in a virtual environment
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but if dill is not preinstalled in your venv, the task will fail with use_dill enabled.
but if dill is not preinstalled in your virtual environment, the task will fail with use_dill enabled.
:param op_args: A list of positional arguments to pass to python_callable.
:param op_kwargs: A dict of keyword arguments to pass to python_callable.
:param string_args: Strings that are present in the global var virtualenv_string_args,
Expand All @@ -697,7 +785,7 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
exit code will be treated as a failure.
"""

template_fields: Sequence[str] = tuple({"python"} | set(PythonOperator.template_fields))
template_fields: Sequence[str] = tuple({"python"}.union(PythonOperator.template_fields))

def __init__(
self,
Expand Down Expand Up @@ -749,7 +837,8 @@ def execute_callable(self):
raise AirflowException(
"Passing op_args or op_kwargs is not supported across different Python "
"major versions for ExternalPythonOperator. Please use string_args."
f"Sys version: {sys.version_info}. Venv version: {python_version_as_list_of_strings}"
f"Sys version: {sys.version_info}. "
f"Virtual environment version: {python_version_as_list_of_strings}"
)
with TemporaryDirectory(prefix="tmd") as tmp_dir:
tmp_path = Path(tmp_dir)
Expand All @@ -776,11 +865,11 @@ def _is_pendulum_installed_in_target_env(self) -> bool:
return True
except Exception as e:
if self.expect_pendulum:
self.log.warning("When checking for Pendulum installed in venv got %s", e)
self.log.warning("When checking for Pendulum installed in virtual environment got %s", e)
self.log.warning(
"Pendulum is not properly installed in the virtualenv "
"Pendulum is not properly installed in the virtual environment "
"Pendulum context keys will not be available. "
"Please Install Pendulum or Airflow in your venv to access them."
"Please Install Pendulum or Airflow in your virtual environment to access them."
)
return False

Expand All @@ -805,7 +894,7 @@ def _get_airflow_version_from_target_env(self) -> str | None:
return target_airflow_version
except Exception as e:
if self.expect_airflow:
self.log.warning("When checking for Airflow installed in venv got %s", e)
self.log.warning("When checking for Airflow installed in virtual environment got %s", e)
self.log.warning(
f"This means that Airflow is not properly installed by "
f"{self.python}. Airflow context keys will not be available. "
Expand All @@ -819,8 +908,8 @@ class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn):
A workflow can "branch" or follow a path after the execution of this task.

Extends ExternalPythonOperator, so expects to get Python:
virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path,
so it can run on separate virtualenv similarly to ExternalPythonOperator.
virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path,
so it can run on separate virtual environment similarly to ExternalPythonOperator.
"""

def execute(self, context: Context) -> Any:
Expand Down