Skip to content

Commit

Permalink
Introduce @task.bash TaskFlow decorator
Browse files Browse the repository at this point in the history
Adding a @task.bash TaskFlow decorator to the collection of existing core TaskFlow decorators. This particular decorator will use the return value of the decorated callable as the Bash command to execute using the existing BashOperator functionality.
  • Loading branch information
josh-fell committed Dec 1, 2023
1 parent cf052dc commit cb3f973
Show file tree
Hide file tree
Showing 7 changed files with 641 additions and 14 deletions.
3 changes: 3 additions & 0 deletions airflow/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Any, Callable

from airflow.decorators.base import TaskDecorator
from airflow.decorators.bash import bash_task
from airflow.decorators.branch_external_python import branch_external_python_task
from airflow.decorators.branch_python import branch_task
from airflow.decorators.branch_virtualenv import branch_virtualenv_task
Expand Down Expand Up @@ -47,6 +48,7 @@
"branch_external_python_task",
"short_circuit_task",
"sensor_task",
"bash_task",
"setup",
"teardown",
]
Expand All @@ -63,6 +65,7 @@ class TaskDecoratorCollection:
branch_external_python = staticmethod(branch_external_python_task)
short_circuit = staticmethod(short_circuit_task)
sensor = staticmethod(sensor_task)
bash = staticmethod(bash_task)

__call__: Any = python # Alias '@task' to '@task.python'.

Expand Down
32 changes: 32 additions & 0 deletions airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ from typing import Any, Callable, Collection, Container, Iterable, Mapping, over
from kubernetes.client import models as k8s

from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator
from airflow.decorators.bash import bash_task
from airflow.decorators.branch_external_python import branch_external_python_task
from airflow.decorators.branch_python import branch_task
from airflow.decorators.branch_virtualenv import branch_virtualenv_task
Expand Down Expand Up @@ -54,6 +55,7 @@ __all__ = [
"branch_external_python_task",
"short_circuit_task",
"sensor_task",
"bash_task",
"setup",
"teardown",
]
Expand Down Expand Up @@ -638,6 +640,36 @@ class TaskDecoratorCollection:
def pyspark(
self, python_callable: Callable[FParams, FReturn] | None = None
) -> Task[FParams, FReturn]: ...
@overload
def bash(
self,
*,
env: dict[str, str] | None = None,
append_env: bool = False,
output_encoding: str = "utf-8",
skip_exit_code: int = 99,
cwd: str | None = None,
**kwargs,
) -> TaskDecorator:
"""Decorator to wrap a callable into a BashOperator task.
:param bash_command: The command, set of commands or reference to a bash script (must be '.sh') to be
executed. (templated)
:param env: If env is not None, it must be a dict that defines the environment variables for the new
process; these are used instead of inheriting the current process environment, which is the
default behavior. (templated)
:param append_env: If False(default) uses the environment variables passed in env params and does not
inherit the current process environment. If True, inherits the environment variables from current
passes and then environment variable passed by the user will either update the existing inherited
environment variables or the new variables gets appended to it
:param output_encoding: Output encoding of bash command
:param skip_exit_code: If task exits with this exit code, leave the task in ``skipped`` state
(default: 99). If set to ``None``, any non-zero exit code will be treated as a failure.
:param cwd: Working directory to execute the command in. If None (default), the command is run in a
temporary directory.
"""
@overload
def bash(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...

task: TaskDecoratorCollection
setup: Callable
Expand Down
99 changes: 99 additions & 0 deletions airflow/decorators/bash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import warnings
from typing import Any, Callable, Collection, Mapping, Sequence

from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory
from airflow.operators.bash import BashOperator
from airflow.utils.context import Context, context_merge
from airflow.utils.operator_helpers import determine_kwargs
from airflow.utils.types import NOTSET


class _BashDecoratedOperator(DecoratedOperator, BashOperator):
"""
Wraps a Python callable and uses the callable return value as the Bash command to be executed.
:param python_callable: A reference to an object that is callable.
:param op_kwargs: A dictionary of keyword arguments that will get unpacked
in your function (templated).
:param op_args: A list of positional arguments that will get unpacked when
calling your callable (templated).
"""

template_fields: Sequence[str] = (*DecoratedOperator.template_fields, "env")
template_fields_renderers: dict[str, str] = {**DecoratedOperator.template_fields_renderers, "env": "json"}

custom_operator_name: str = "@task.bash"

def __init__(
self,
*,
python_callable: Callable,
op_args: Collection[Any] | None = None,
op_kwargs: Mapping[str, Any] | None = None,
**kwargs,
) -> None:
if kwargs.get("multiple_outputs") is not None:
warnings.warn(
f"`multiple_outputs` is not supported in {self.custom_operator_name} tasks. Ignoring.",
stacklevel=1,
)
kwargs.pop("multiple_outputs")

super().__init__(
python_callable=python_callable,
op_args=op_args,
op_kwargs=op_kwargs,
bash_command=NOTSET,
**kwargs,
)

def execute(self, context: Context) -> Any:
context_merge(context, self.op_kwargs)
kwargs = determine_kwargs(self.python_callable, self.op_args, context)

self.bash_command = self.python_callable(*self.op_args, **kwargs)

if not isinstance(self.bash_command, str) or self.bash_command == "":
raise TypeError("The returned value from the TaskFlow callable must be a non-empty string.")

return super().execute(context)


def bash_task(
python_callable: Callable | None = None,
**kwargs,
) -> TaskDecorator:
"""
Wraps a function into a BashOperator.
Accepts kwargs for operator kwargs. Can be reused in a single DAG. This function is only used only used
during type checking or auto-completion.
:param python_callable: Function to decorate.
:meta private:
"""
return task_decorator_factory(
python_callable=python_callable,
decorated_operator_class=_BashDecoratedOperator,
**kwargs,
)
88 changes: 88 additions & 0 deletions airflow/example_dags/example_bash_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the BashOperator."""
from __future__ import annotations

import pendulum

from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from airflow.models.baseoperator import chain
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weekday import WeekDay


@dag(schedule=None, start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), catchup=False)
def example_bash_decorator():
@task.bash
def run_me(sleep_seconds: int, task_instance_key_str: str) -> str:
return f"echo {task_instance_key_str} && sleep {sleep_seconds}"

run_me_loop = [run_me.override(task_id=f"runme_{i}")(sleep_seconds=i) for i in range(3)]

# [START howto_decorator_bash]
@task.bash
def run_after_loop() -> str:
return "echo 1"

run_this = run_after_loop()
# [END howto_decorator_bash]

# [START howto_decorator_bash_template]
@task.bash
def also_run_this(task_instance_key_str) -> str:
return f'echo "ti_key={task_instance_key_str}"'

also_this = also_run_this()
# [END howto_decorator_bash_template]

# [START howto_decorator_bash_skip]
@task.bash
def this_will_skip() -> str:
return 'echo "hello world"; exit 99;'

this_skips = this_will_skip()
# [END howto_decorator_bash_skip]

run_this_last = EmptyOperator(task_id="run_this_last", trigger_rule=TriggerRule.ALL_DONE)

# [START howto_decorator_bash_conditional]
@task.bash
def sleep_in(day: str) -> str:
if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
return f"sleep {60 * 60}"
else:
raise AirflowSkipException("No sleeping in today!")

sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")
# [END howto_decorator_bash_conditional]

# [START howto_decorator_bash_parametrize]
@task.bash(env={"BASE_DIR": "{{ dag_run.logical_date.strftime('%Y/%m/%d') }}"}, append_env=True)
def make_dynamic_dirs(new_dirs: str) -> str:
return f"mkdir -p $AIRFLOW_HOME/$BASE_DIR/{new_dirs}"

make_dynamic_dirs(new_dirs="foo/bar/baz")
# [END howto_decorator_bash_parametrize]

chain(run_me_loop, run_this)
chain([also_this, this_skips, run_this], run_this_last)


example_bash_decorator()

0 comments on commit cb3f973

Please sign in to comment.