Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decorators for external and venv python branching operators #35043

Merged
merged 6 commits into from Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions airflow/decorators/__init__.py
Expand Up @@ -19,7 +19,9 @@
from typing import Any, Callable

from airflow.decorators.base import TaskDecorator
from airflow.decorators.branch_external_python import branch_external_python_task
from airflow.decorators.branch_python import branch_task
from airflow.decorators.branch_virtualenv import branch_virtualenv_task
from airflow.decorators.external_python import external_python_task
from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import virtualenv_task
Expand All @@ -41,6 +43,8 @@
"virtualenv_task",
"external_python_task",
"branch_task",
"branch_virtualenv_task",
"branch_external_python_task",
"short_circuit_task",
"sensor_task",
"setup",
Expand All @@ -55,6 +59,8 @@ class TaskDecoratorCollection:
virtualenv = staticmethod(virtualenv_task)
external_python = staticmethod(external_python_task)
branch = staticmethod(branch_task)
branch_virtualenv = staticmethod(branch_virtualenv_task)
branch_external_python = staticmethod(branch_external_python_task)
short_circuit = staticmethod(short_circuit_task)
sensor = staticmethod(sensor_task)

Expand Down
97 changes: 97 additions & 0 deletions airflow/decorators/__init__.pyi
Expand Up @@ -26,7 +26,9 @@ from typing import Any, Callable, Collection, Container, Iterable, Mapping, over
from kubernetes.client import models as k8s

from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator
from airflow.decorators.branch_external_python import branch_external_python_task
from airflow.decorators.branch_python import branch_task
from airflow.decorators.branch_virtualenv import branch_virtualenv_task
from airflow.decorators.external_python import external_python_task
from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import virtualenv_task
Expand All @@ -47,6 +49,8 @@ __all__ = [
"virtualenv_task",
"external_python_task",
"branch_task",
"branch_virtualenv_task",
"branch_external_python_task",
"short_circuit_task",
"sensor_task",
"setup",
Expand Down Expand Up @@ -194,6 +198,99 @@ class TaskDecoratorCollection:
@overload
def branch(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@overload
def branch_virtualenv(
self,
*,
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
requirements: None | Iterable[str] | str = None,
python_version: None | str | int | float = None,
use_dill: bool = False,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.

For more information on how to use this decorator, see :ref:`concepts:branching`.
Accepts arbitrary for operator kwarg. Can be reused in a single DAG.

:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param requirements: Either a list of requirement strings, or a (templated)
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
:param pip_install_options: a list of pip install options when installing requirements
See 'pip install -h' for available options
:param skip_on_exit_code: If python_callable exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
:param venv_cache_path: Optional path to the virtual environment parent folder in which the
virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced
with a checksum of requirements. If not provided the virtual environment will be created and
deleted in a temp folder for every execution.
:param show_return_value_in_logs: a bool value whether to show return_value
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
"""
@overload
def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@overload
def branch_external_python(
self,
*,
python: str,
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
use_dill: bool = False,
templates_dict: Mapping[str, Any] | None = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator.

For more information on how to use this decorator, see :ref:`concepts:branching`.
Accepts arbitrary for operator kwarg. Can be reused in a single DAG.

:param python: Full path string (file-system specific) that points to a Python binary inside
a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
in your callable's context after the template has been applied.
:param show_return_value_in_logs: a bool value whether to show return_value
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
"""
@overload
def branch_external_python(
self, python_callable: Callable[FParams, FReturn]
) -> Task[FParams, FReturn]: ...
@overload
def short_circuit(
self,
*,
Expand Down
56 changes: 56 additions & 0 deletions airflow/decorators/branch_external_python.py
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import BranchExternalPythonOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator


class _BranchExternalPythonDecoratedOperator(_PythonDecoratedOperator, BranchExternalPythonOperator):
"""Wraps a Python callable and captures args/kwargs when called for execution."""

custom_operator_name: str = "@task.branch_external_python"


def branch_external_python_task(
python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs
) -> TaskDecorator:
"""
Wrap a python function into a BranchExternalPythonOperator.

For more information on how to use this operator, take a look at the guide:
:ref:`concepts:branching`

Accepts kwargs for operator kwarg. Can be reused in a single DAG.

:param python_callable: Function to decorate
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_BranchExternalPythonDecoratedOperator,
**kwargs,
)
56 changes: 56 additions & 0 deletions airflow/decorators/branch_virtualenv.py
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import BranchPythonVirtualenvOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator


class _BranchPythonVirtualenvDecoratedOperator(_PythonDecoratedOperator, BranchPythonVirtualenvOperator):
"""Wraps a Python callable and captures args/kwargs when called for execution."""

custom_operator_name: str = "@task.branch_virtualenv"


def branch_virtualenv_task(
python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs
) -> TaskDecorator:
"""
Wrap a python function into a BranchPythonVirtualenvOperator.

For more information on how to use this operator, take a look at the guide:
:ref:`concepts:branching`

Accepts kwargs for operator kwarg. Can be reused in a single DAG.

:param python_callable: Function to decorate
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_BranchPythonVirtualenvDecoratedOperator,
**kwargs,
)