Skip to content

Commit

Permalink
Add Python Virtualenv Operator Caching (#33355)
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Oct 18, 2023
1 parent a74ec40 commit a206012
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 31 deletions.
11 changes: 8 additions & 3 deletions airflow/decorators/__init__.pyi
Expand Up @@ -110,6 +110,7 @@ class TaskDecoratorCollection:
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> TaskDecorator:
Expand All @@ -119,13 +120,13 @@ class TaskDecoratorCollection:
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param requirements: Either a list of requirement strings, or a (templated)
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtualenv with. Note that
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtualenv.
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
:param pip_install_options: a list of pip install options when installing requirements
See 'pip install -h' for available options
Expand All @@ -134,6 +135,10 @@ class TaskDecoratorCollection:
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:param venv_cache_path: Optional path to the virtual environment parent folder in which the
virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be
replaced with a checksum of requirements. If not provided the virtual environment will be
created and deleted in a temp folder for every execution.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -160,7 +165,7 @@ class TaskDecoratorCollection:
"""Create a decorator to convert the decorated callable to a virtual environment task.
:param python: Full path string (file-system specific) that points to a Python binary inside
a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path
a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
Expand Down
131 changes: 110 additions & 21 deletions airflow/operators/python.py
Expand Up @@ -17,8 +17,10 @@
# under the License.
from __future__ import annotations

import fcntl
import importlib
import inspect
import json
import logging
import os
import pickle
Expand Down Expand Up @@ -46,7 +48,9 @@
from airflow.models.baseoperator import BaseOperator
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.models.variable import Variable
from airflow.operators.branch import BranchMixIn
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
Expand Down Expand Up @@ -489,16 +493,16 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
:ref:`howto/operator:PythonVirtualenvOperator`
:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtualenv
defined with def, which will be run in a virtual environment.
:param requirements: Either a list of requirement strings, or a (templated)
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtualenv with. Note that
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtualenv.
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
:param pip_install_options: a list of pip install options when installing requirements
See 'pip install -h' for available options
Expand All @@ -521,9 +525,15 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:param venv_cache_path: Optional path to the virtual environment parent folder in which the
virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced
with a checksum of requirements. If not provided the virtual environment will be created and deleted
in a temp folder for every execution.
"""

template_fields: Sequence[str] = tuple({"requirements"} | set(PythonOperator.template_fields))
template_fields: Sequence[str] = tuple(
{"requirements", "index_urls", "venv_cache_path"}.union(PythonOperator.template_fields)
)
template_ext: Sequence[str] = (".txt",)

def __init__(
Expand All @@ -543,6 +553,7 @@ def __init__(
expect_airflow: bool = True,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | os.PathLike[str] = None,
**kwargs,
):
if (
Expand All @@ -553,7 +564,7 @@ def __init__(
raise AirflowException(
"Passing op_args or op_kwargs is not supported across different Python "
"major versions for PythonVirtualenvOperator. Please use string_args."
f"Sys version: {sys.version_info}. Venv version: {python_version}"
f"Sys version: {sys.version_info}. Virtual environment version: {python_version}"
)
if python_version is not None and not isinstance(python_version, str):
warnings.warn(
Expand All @@ -579,6 +590,7 @@ def __init__(
self.index_urls = list(index_urls)
else:
self.index_urls = None
self.venv_cache_path = venv_cache_path
super().__init__(
python_callable=python_callable,
use_dill=use_dill,
Expand All @@ -593,15 +605,15 @@ def __init__(
)

def _requirements_list(self) -> list[str]:
"""Prepare a list of requirements that need to be installed for the venv."""
"""Prepare a list of requirements that need to be installed for the virtual environment."""
requirements = [str(dependency) for dependency in self.requirements]
if not self.system_site_packages and self.use_dill and "dill" not in requirements:
requirements.append("dill")
requirements.sort() # Ensure a hash is stable
return requirements

def _prepare_venv(self, venv_path: Path) -> None:
"""Prepare the requirements and installs the venv."""
"""Prepare the requirements and installs the virtual environment."""
requirements_file = venv_path / "requirements.txt"
requirements_file.write_text("\n".join(self._requirements_list()))
prepare_virtualenv(
Expand All @@ -613,7 +625,83 @@ def _prepare_venv(self, venv_path: Path) -> None:
index_urls=self.index_urls,
)

def _calculate_cache_hash(self) -> tuple[str, str]:
"""Helper to generate the hash of the cache folder to use.
The following factors are used as input for the hash:
- (sorted) list of requirements
- pip install options
- flag of system site packages
- python version
- Variable to override the hash with a cache key
- Index URLs
Returns a hash and the data dict which is the base for the hash as text.
"""
hash_dict = {
"requirements_list": self._requirements_list(),
"pip_install_options": self.pip_install_options,
"index_urls": self.index_urls,
"cache_key": str(Variable.get("PythonVirtualenvOperator.cache_key", "")),
"python_version": self.python_version,
"system_site_packages": self.system_site_packages,
}
hash_text = json.dumps(hash_dict, sort_keys=True)
hash_object = hashlib_wrapper.md5(hash_text.encode())
requirements_hash = hash_object.hexdigest()
return requirements_hash[:8], hash_text

def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path:
"""Helper to ensure a valid virtual environment is set up and will create inplace."""
cache_hash, hash_data = self._calculate_cache_hash()
venv_path = venv_cache_path / f"venv-{cache_hash}"
self.log.info("Python virtual environment will be cached in %s", venv_path)
venv_path.parent.mkdir(parents=True, exist_ok=True)
with open(f"{venv_path}.lock", "w") as f:
# Ensure that cache is not build by parallel workers
fcntl.flock(f, fcntl.LOCK_EX)

hash_marker = venv_path / "install_complete_marker.json"
try:
if venv_path.exists():
if hash_marker.exists():
previous_hash_data = hash_marker.read_text(encoding="utf8")
if previous_hash_data == hash_data:
self.log.info("Re-using cached Python virtual environment in %s", venv_path)
return venv_path

self.log.error(
"Unicorn alert: Found a previous virtual environment in %s "
"with the same hash but different parameters. Previous setup: '%s' / "
"Requested venv setup: '%s'. Please report a bug to airflow!",
venv_path,
previous_hash_data,
hash_data,
)
else:
self.log.warning(
"Found a previous (probably partial installed) virtual environment in %s, "
"deleting and re-creating.",
venv_path,
)

shutil.rmtree(venv_path)

venv_path.mkdir(parents=True)
self._prepare_venv(venv_path)
hash_marker.write_text(hash_data, encoding="utf8")
except Exception as e:
shutil.rmtree(venv_path)
raise AirflowException(f"Unable to create new virtual environment in {venv_path}") from e
self.log.info("New Python virtual environment created in %s", venv_path)
return venv_path

def execute_callable(self):
if self.venv_cache_path:
venv_path = self._ensure_venv_cache_exists(Path(self.venv_cache_path))
python_path = venv_path / "bin" / "python"
return self._execute_python_callable_in_subprocess(python_path, venv_path)

with TemporaryDirectory(prefix="venv") as tmp_dir:
tmp_path = Path(tmp_dir)
self._prepare_venv(tmp_path)
Expand All @@ -632,7 +720,7 @@ def _iter_serializable_context_keys(self):

class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn):
"""
A workflow can "branch" or follow a path after the execution of this task in a virtualenv.
A workflow can "branch" or follow a path after the execution of this task in a virtual environment.
It derives the PythonVirtualenvOperator and expects a Python function that returns
a single task_id or list of task_ids to follow. The task_id(s) returned
Expand All @@ -651,15 +739,15 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
"""
Run a function in a virtualenv that is not re-created.
Reused as is without the overhead of creating the virtualenv (with certain caveats).
Reused as is without the overhead of creating the virtual environment (with certain caveats).
The function must be defined using def, and not be
part of a class. All imports must happen inside the function
and no variables outside the scope may be referenced. A global scope
variable named virtualenv_string_args will be available (populated by
string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
can use a return value.
Note that if your virtualenv runs in a different Python major version than Airflow,
Note that if your virtual environment runs in a different Python major version than Airflow,
you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
Airflow through plugins. You can use string_args though.
Expand All @@ -671,13 +759,13 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
:ref:`howto/operator:ExternalPythonOperator`
:param python: Full path string (file-system specific) that points to a Python binary inside
a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path
a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtualenv
defined with def, which will be run in a virtual environment
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but if dill is not preinstalled in your venv, the task will fail with use_dill enabled.
but if dill is not preinstalled in your virtual environment, the task will fail with use_dill enabled.
:param op_args: A list of positional arguments to pass to python_callable.
:param op_kwargs: A dict of keyword arguments to pass to python_callable.
:param string_args: Strings that are present in the global var virtualenv_string_args,
Expand All @@ -697,7 +785,7 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
exit code will be treated as a failure.
"""

template_fields: Sequence[str] = tuple({"python"} | set(PythonOperator.template_fields))
template_fields: Sequence[str] = tuple({"python"}.union(PythonOperator.template_fields))

def __init__(
self,
Expand Down Expand Up @@ -749,7 +837,8 @@ def execute_callable(self):
raise AirflowException(
"Passing op_args or op_kwargs is not supported across different Python "
"major versions for ExternalPythonOperator. Please use string_args."
f"Sys version: {sys.version_info}. Venv version: {python_version_as_list_of_strings}"
f"Sys version: {sys.version_info}. "
f"Virtual environment version: {python_version_as_list_of_strings}"
)
with TemporaryDirectory(prefix="tmd") as tmp_dir:
tmp_path = Path(tmp_dir)
Expand All @@ -776,11 +865,11 @@ def _is_pendulum_installed_in_target_env(self) -> bool:
return True
except Exception as e:
if self.expect_pendulum:
self.log.warning("When checking for Pendulum installed in venv got %s", e)
self.log.warning("When checking for Pendulum installed in virtual environment got %s", e)
self.log.warning(
"Pendulum is not properly installed in the virtualenv "
"Pendulum is not properly installed in the virtual environment "
"Pendulum context keys will not be available. "
"Please Install Pendulum or Airflow in your venv to access them."
"Please Install Pendulum or Airflow in your virtual environment to access them."
)
return False

Expand All @@ -805,7 +894,7 @@ def _get_airflow_version_from_target_env(self) -> str | None:
return target_airflow_version
except Exception as e:
if self.expect_airflow:
self.log.warning("When checking for Airflow installed in venv got %s", e)
self.log.warning("When checking for Airflow installed in virtual environment got %s", e)
self.log.warning(
f"This means that Airflow is not properly installed by "
f"{self.python}. Airflow context keys will not be available. "
Expand All @@ -819,8 +908,8 @@ class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn):
A workflow can "branch" or follow a path after the execution of this task.
Extends ExternalPythonOperator, so expects to get Python:
virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path,
so it can run on separate virtualenv similarly to ExternalPythonOperator.
virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path,
so it can run on separate virtual environment similarly to ExternalPythonOperator.
"""

def execute(self, context: Context) -> Any:
Expand Down

0 comments on commit a206012

Please sign in to comment.