Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow PythonVenvOperator using other index url #33017

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion airflow/decorators/__init__.pyi
Expand Up @@ -21,7 +21,7 @@
from __future__ import annotations

from datetime import timedelta
from typing import Any, Callable, Iterable, Mapping, overload
from typing import Any, Callable, Collection, Container, Iterable, Mapping, overload

from kubernetes.client import models as k8s

Expand Down Expand Up @@ -107,6 +107,9 @@ class TaskDecoratorCollection:
use_dill: bool = False,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> TaskDecorator:
Expand All @@ -124,6 +127,13 @@ class TaskDecoratorCollection:
:param system_site_packages: Whether to include
system_site_packages in your virtualenv.
See virtualenv documentation for more information.
:param pip_install_options: a list of pip install options when installing requirements
See 'pip install -h' for available options
:param skip_on_exit_code: If python_callable exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand Down
94 changes: 94 additions & 0 deletions airflow/hooks/package_index.py
@@ -0,0 +1,94 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for additional Package Indexes (Python)."""
from __future__ import annotations

import subprocess
from typing import Any
from urllib.parse import quote, urlparse

from airflow.hooks.base import BaseHook


class PackageIndexHook(BaseHook):
"""Specify package indexes/Python package sources using Airflow connections."""

conn_name_attr = "pi_conn_id"
default_conn_name = "package_index_default"
conn_type = "package_index"
hook_name = "Package Index (Python)"

def __init__(self, pi_conn_id: str = default_conn_name) -> None:
super().__init__()
self.pi_conn_id = pi_conn_id
self.conn = None

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour."""
return {
"hidden_fields": ["schema", "port", "extra"],
"relabeling": {"host": "Package Index URL"},
"placeholders": {
"host": "Example: https://my-package-mirror.net/pypi/repo-name/simple",
"login": "Username for package index",
"password": "Password for package index (will be masked)",
},
}

@staticmethod
def _get_basic_auth_conn_url(index_url: str, user: str | None, password: str | None) -> str:
"""Returns a connection URL with basic auth credentials based on connection config."""
url = urlparse(index_url)
host = url.netloc.split("@")[-1]
if user:
if password:
host = f"{quote(user)}:{quote(password)}@{host}"
else:
host = f"{quote(user)}@{host}"
return url._replace(netloc=host).geturl()

def get_conn(self) -> Any:
"""Returns connection for the hook."""
return self.get_connection_url()

def get_connection_url(self) -> Any:
"""Returns a connection URL with embedded credentials."""
conn = self.get_connection(self.pi_conn_id)
index_url = conn.host
if not index_url:
raise Exception("Please provide an index URL.")
return self._get_basic_auth_conn_url(index_url, conn.login, conn.password)

def test_connection(self) -> tuple[bool, str]:
"""Test connection to package index url."""
conn_url = self.get_connection_url()
proc = subprocess.run(
["pip", "search", "not-existing-test-package", "--no-input", "--index", conn_url],
check=False,
capture_output=True,
)
conn = self.get_connection(self.pi_conn_id)
if proc.returncode not in [
0, # executed successfully, found package
23, # executed successfully, didn't find any packages
# (but we do not expect it to find 'not-existing-test-package')
]:
return False, f"Connection test to {conn.host} failed. Error: {str(proc.stderr)}"

return True, f"Connection to {conn.host} tested successfully!"
54 changes: 33 additions & 21 deletions airflow/operators/python.py
Expand Up @@ -507,6 +507,8 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
:param skip_on_exit_code: If python_callable exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
"""

template_fields: Sequence[str] = tuple({"requirements"} | set(PythonOperator.template_fields))
Expand All @@ -528,6 +530,7 @@ def __init__(
templates_exts: list[str] | None = None,
expect_airflow: bool = True,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
**kwargs,
):
if (
Expand All @@ -543,14 +546,20 @@ def __init__(
if importlib.util.find_spec("virtualenv") is None:
raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.")
if not requirements:
self.requirements: list[str] | str = []
self.requirements: list[str] = []
elif isinstance(requirements, str):
self.requirements = requirements
self.requirements = [requirements]
else:
self.requirements = list(requirements)
self.python_version = python_version
self.system_site_packages = system_site_packages
self.pip_install_options = pip_install_options
if isinstance(index_urls, str):
self.index_urls: list[str] | None = [index_urls]
elif isinstance(index_urls, Collection):
self.index_urls = list(index_urls)
else:
self.index_urls = None
super().__init__(
python_callable=python_callable,
use_dill=use_dill,
Expand All @@ -564,28 +573,31 @@ def __init__(
**kwargs,
)

def _requirements_list(self) -> list[str]:
"""Prepares a list of requirements that need to be installed for the venv."""
requirements = [str(dependency) for dependency in self.requirements]
if not self.system_site_packages and self.use_dill and "dill" not in requirements:
requirements.append("dill")
requirements.sort() # Ensure a hash is stable
return requirements

def _prepare_venv(self, venv_path: Path) -> None:
"""Prepares the requirements and installs the venv."""
requirements_file = venv_path / "requirements.txt"
requirements_file.write_text("\n".join(self._requirements_list()))
prepare_virtualenv(
venv_directory=str(venv_path),
python_bin=f"python{self.python_version}" if self.python_version else "python",
system_site_packages=self.system_site_packages,
requirements_file_path=str(requirements_file),
pip_install_options=self.pip_install_options,
index_urls=self.index_urls,
)

def execute_callable(self):
with TemporaryDirectory(prefix="venv") as tmp_dir:
tmp_path = Path(tmp_dir)
requirements_file_name = f"{tmp_dir}/requirements.txt"

if not isinstance(self.requirements, str):
requirements_file_contents = "\n".join(str(dependency) for dependency in self.requirements)
else:
requirements_file_contents = self.requirements

if not self.system_site_packages and self.use_dill:
requirements_file_contents += "\ndill"

with open(requirements_file_name, "w") as file:
file.write(requirements_file_contents)
prepare_virtualenv(
venv_directory=tmp_dir,
python_bin=f"python{self.python_version}" if self.python_version else None,
system_site_packages=self.system_site_packages,
requirements_file_path=requirements_file_name,
pip_install_options=self.pip_install_options,
)
self._prepare_venv(tmp_path)
python_path = tmp_path / "bin" / "python"
result = self._execute_python_callable_in_subprocess(python_path, tmp_path)
return result
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers_manager.py
Expand Up @@ -37,6 +37,7 @@

from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.hooks.filesystem import FSHook
from airflow.hooks.package_index import PackageIndexHook
from airflow.typing_compat import Literal
from airflow.utils import yaml
from airflow.utils.entry_points import entry_points_with_dist
Expand Down Expand Up @@ -458,7 +459,7 @@ def _init_airflow_core_hooks(self):
connection_type=None,
connection_testable=False,
)
for cls in [FSHook]:
for cls in [FSHook, PackageIndexHook]:
package_name = cls.__module__
hook_class_name = f"{cls.__module__}.{cls.__name__}"
hook_info = self._import_hook(
Expand Down
19 changes: 19 additions & 0 deletions airflow/utils/python_virtualenv.py
Expand Up @@ -21,6 +21,7 @@
import os
import sys
import warnings
from pathlib import Path

import jinja2

Expand Down Expand Up @@ -51,6 +52,16 @@ def _generate_pip_install_cmd_from_list(
return cmd + requirements


def _generate_pip_conf(conf_file: Path, index_urls: list[str]) -> None:
if len(index_urls) == 0:
pip_conf_options = "no-index = true"
else:
pip_conf_options = f"index-url = {index_urls[0]}"
if len(index_urls) > 1:
pip_conf_options += f"\nextra-index-url = {' '.join(x for x in index_urls[1:])}"
conf_file.write_text(f"[global]\n{pip_conf_options}")


def remove_task_decorator(python_source: str, task_decorator_name: str) -> str:
warnings.warn(
"Import remove_task_decorator from airflow.utils.decorators instead",
Expand All @@ -67,6 +78,7 @@ def prepare_virtualenv(
requirements: list[str] | None = None,
requirements_file_path: str | None = None,
pip_install_options: list[str] | None = None,
index_urls: list[str] | None = None,
) -> str:
"""Creates a virtual environment and installs the additional python packages.

Expand All @@ -76,11 +88,18 @@ def prepare_virtualenv(
See virtualenv documentation for more information.
:param requirements: List of additional python packages.
:param requirements_file_path: Path to the ``requirements.txt`` file.
:param pip_install_options: a list of pip install options when installing requirements
See 'pip install -h' for available options
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:return: Path to a binary file with Python in a virtual environment.
"""
if pip_install_options is None:
pip_install_options = []

if index_urls is not None:
_generate_pip_conf(Path(venv_directory) / "pip.conf", index_urls)
jscheffl marked this conversation as resolved.
Show resolved Hide resolved

virtualenv_cmd = _generate_virtualenv_cmd(venv_directory, python_bin, system_site_packages)
execute_in_subprocess(virtualenv_cmd)

Expand Down
14 changes: 14 additions & 0 deletions docs/apache-airflow/howto/operator/python.rst
Expand Up @@ -112,6 +112,20 @@ If additional parameters for package installation are needed pass them in ``requ

All supported options are listed in the `requirements file format <https://pip.pypa.io/en/stable/reference/requirements-file-format/#supported-options>`_.

Virtualenv setup options
^^^^^^^^^^^^^^^^^^^^^^^^

The virtualenv is created based on the global python pip configuration on your worker. Using additional ENVs in your environment or adjustments in the general
pip configuration as described in `pip config <https://pip.pypa.io/en/stable/topics/configuration/>`_.

If you want to use additional task specific private python repositories to setup the virtualenv, you can pass the ``index_urls`` parameter which will adjust the
pip install configurations. Passed index urls replace the standard system configured index url settings.
To prevent adding secrets to the private repository in your DAG code you can use the Airflow
:doc:`../../authoring-and-scheduling/connections`. For this purpose the connection type ``Package Index (Python)`` can be used.

In the special case you want to prevent remote calls for setup of a virtualenv, pass the ``index_urls`` as empty list as ``index_urls=[]`` which
forced pip installer to use the ``--no-index`` option.


.. _howto/operator:ExternalPythonOperator:

Expand Down
3 changes: 3 additions & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -1617,6 +1617,7 @@ Url
url
urlencoded
urlparse
urls
useHCatalog
useLegacySQL
useQueryCache
Expand All @@ -1637,6 +1638,7 @@ vCPU
ve
vendored
Vendorize
venv
venvs
versionable
Vertica
Expand All @@ -1646,6 +1648,7 @@ Vevo
videointelligence
VideoIntelligenceServiceClient
virtualenv
virtualenvs
vm
VolumeMount
volumeMounts
Expand Down