Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ env:
USE_SUDO: "true"
INCLUDE_SUCCESS_OUTPUTS: "true"
AIRFLOW_ENABLE_AIP_44: "true"
AIRFLOW_ENABLE_AIP_52: "true"

concurrency:
group: ci-${{ github.event.pull_request.number || github.ref }}
Expand Down Expand Up @@ -1066,7 +1065,6 @@ jobs:
POSTGRES_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
AIRFLOW_ENABLE_AIP_44: "false"
AIRFLOW_ENABLE_AIP_52: "false"
JOB_ID: >
postgres-in-progress-disabled-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
Expand Down
7 changes: 0 additions & 7 deletions airflow/decorators/setup_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@
from airflow import AirflowException
from airflow.decorators import python_task
from airflow.decorators.task_group import _TaskGroupFactory
from airflow.settings import _ENABLE_AIP_52


def setup_task(func: Callable) -> Callable:
if not _ENABLE_AIP_52:
raise AirflowException("AIP-52 Setup tasks are disabled.")

# Using FunctionType here since _TaskDecorator is also a callable
if isinstance(func, types.FunctionType):
func = python_task(func)
Expand All @@ -39,9 +35,6 @@ def setup_task(func: Callable) -> Callable:


def teardown_task(_func=None, *, on_failure_fail_dagrun: bool = False) -> Callable:
if not _ENABLE_AIP_52:
raise AirflowException("AIP-52 Teardown tasks are disabled.")

def teardown(func: Callable) -> Callable:
# Using FunctionType here since _TaskDecorator is also a callable
if isinstance(func, types.FunctionType):
Expand Down
34 changes: 16 additions & 18 deletions airflow/example_dags/example_setup_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,23 @@

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.settings import _ENABLE_AIP_52
from airflow.utils.task_group import TaskGroup

if _ENABLE_AIP_52:
with DAG(
dag_id="example_setup_teardown",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
BashOperator.as_setup(task_id="root_setup", bash_command="echo 'Hello from root_setup'")
normal = BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
BashOperator.as_teardown(task_id="root_teardown", bash_command="echo 'Goodbye from root_teardown'")
with DAG(
dag_id="example_setup_teardown",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
BashOperator.as_setup(task_id="root_setup", bash_command="echo 'Hello from root_setup'")
normal = BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
BashOperator.as_teardown(task_id="root_teardown", bash_command="echo 'Goodbye from root_teardown'")

with TaskGroup("section_1") as section_1:
BashOperator.as_setup(task_id="taskgroup_setup", bash_command="echo 'Hello from taskgroup_setup'")
BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
BashOperator.as_setup(
task_id="taskgroup_teardown", bash_command="echo 'Hello from taskgroup_teardown'"
)
with TaskGroup("section_1") as section_1:
BashOperator.as_setup(task_id="taskgroup_setup", bash_command="echo 'Hello from taskgroup_setup'")
BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
BashOperator.as_setup(
task_id="taskgroup_teardown", bash_command="echo 'Hello from taskgroup_teardown'"
)

normal >> section_1
normal >> section_1
74 changes: 36 additions & 38 deletions airflow/example_dags/example_setup_teardown_taskflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,49 @@

from airflow.decorators import setup, task, task_group, teardown
from airflow.models.dag import DAG
from airflow.settings import _ENABLE_AIP_52

if _ENABLE_AIP_52:
with DAG(
dag_id="example_setup_teardown_taskflow",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
# You can use the setup and teardown decorators to add setup and teardown tasks at the DAG level
with DAG(
dag_id="example_setup_teardown_taskflow",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
# You can use the setup and teardown decorators to add setup and teardown tasks at the DAG level
@setup
@task
def root_setup():
print("Hello from root_setup")

@teardown
@task
def root_teardown():
print("Goodbye from root_teardown")

@task
def normal():
print("I am just a normal task")

@task_group
def section_1():
# You can also have setup and teardown tasks at the task group level
@setup
@task
def root_setup():
print("Hello from root_setup")
def my_setup():
print("I set up")

@teardown
@task
def root_teardown():
print("Goodbye from root_teardown")
def my_teardown():
print("I tear down")

@task
def normal():
print("I am just a normal task")

@task_group
def section_1():
# You can also have setup and teardown tasks at the task group level
@setup
@task
def my_setup():
print("I set up")

@teardown
@task
def my_teardown():
print("I tear down")

@task
def hello():
print("I say hello")
def hello():
print("I say hello")

my_setup()
hello()
my_teardown()
my_setup()
hello()
my_teardown()

root_setup()
normal() >> section_1()
root_teardown()
root_setup()
normal() >> section_1()
root_teardown()
10 changes: 0 additions & 10 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,22 +962,12 @@ def __init__(

@classmethod
def as_setup(cls, *args, **kwargs):
from airflow.settings import _ENABLE_AIP_52

if not _ENABLE_AIP_52:
raise AirflowException("AIP-52 Setup tasks are disabled.")

op = cls(*args, **kwargs)
op._is_setup = True
return op

@classmethod
def as_teardown(cls, *args, **kwargs):
from airflow.settings import _ENABLE_AIP_52

if not _ENABLE_AIP_52:
raise AirflowException("AIP-52 Teardown tasks are disabled.")

on_failure_fail_dagrun = kwargs.pop("on_failure_fail_dagrun", False)
if "trigger_rule" in kwargs:
raise ValueError("Cannot set trigger rule for teardown tasks.")
Expand Down
4 changes: 0 additions & 4 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,6 @@ def initialize():
DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")


# AIP-52: setup/teardown (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_52 = os.environ.get("AIRFLOW_ENABLE_AIP_52", "false").lower() in {"true", "t", "yes", "y", "1"}

# AIP-44: internal_api (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_44 = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {"true", "t", "yes", "y", "1"}
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ def update_expected_environment_variables(env: dict[str, str]) -> None:
set_value_to_default_if_not_set(env, "AIRFLOW_CONSTRAINTS_REFERENCE", "constraints-source-providers")
set_value_to_default_if_not_set(env, "AIRFLOW_EXTRAS", "")
set_value_to_default_if_not_set(env, "AIRFLOW_ENABLE_AIP_44", "true")
set_value_to_default_if_not_set(env, "AIRFLOW_ENABLE_AIP_52", "true")
set_value_to_default_if_not_set(env, "ANSWER", answer if answer is not None else "")
set_value_to_default_if_not_set(env, "BASE_BRANCH", "main")
set_value_to_default_if_not_set(env, "BREEZE", "true")
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/docker-compose/_docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
AIRFLOW_CI_IMAGE
AIRFLOW_EXTRAS
AIRFLOW_ENABLE_AIP_44
AIRFLOW_ENABLE_AIP_52
AIRFLOW_CONSTRAINTS_REFERENCE
ANSWER
BACKEND
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/docker-compose/base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ services:
- AIRFLOW_CI_IMAGE=${AIRFLOW_CI_IMAGE}
- AIRFLOW_EXTRAS=${AIRFLOW_EXTRAS}
- AIRFLOW_ENABLE_AIP_44=${AIRFLOW_ENABLE_AIP_44}
- AIRFLOW_ENABLE_AIP_52=${AIRFLOW_ENABLE_AIP_52}
- AIRFLOW_CONSTRAINTS_REFERENCE=${AIRFLOW_CONSTRAINTS_REFERENCE}
- ANSWER=${ANSWER}
- BACKEND=${BACKEND}
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/docker-compose/devcontainer.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ HOME=
AIRFLOW_CI_IMAGE="ghcr.io/apache/airflow/main/ci/python3.7:latest"
ANSWER=
AIRFLOW_ENABLE_AIP_44="true"
AIRFLOW_ENABLE_AIP_52="true"
PYTHON_MAJOR_MINOR_VERSION="3.7"
AIRFLOW_EXTRAS=
BASE_BRANCH="main"
Expand Down
4 changes: 0 additions & 4 deletions tests/always/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import pytest

from airflow.models import DagBag
from airflow.settings import _ENABLE_AIP_52
from airflow.utils import yaml
from tests.test_utils.asserts import assert_queries_count

Expand Down Expand Up @@ -62,9 +61,6 @@ def example_not_suspended_dags():
for candidate in candidates:
if any(candidate.startswith(s) for s in suspended_providers_folders):
continue
# we will also suspend AIP-52 DAGs unless it is enabled
if not _ENABLE_AIP_52 and "example_setup_teardown" in candidate:
continue
yield candidate


Expand Down
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
os.environ["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1"
os.environ["CREDENTIALS_DIR"] = os.environ.get("CREDENTIALS_DIR") or "/files/airflow-breeze-config/keys"
os.environ["AIRFLOW_ENABLE_AIP_44"] = os.environ.get("AIRFLOW_ENABLE_AIP_44") or "true"
os.environ["AIRFLOW_ENABLE_AIP_52"] = os.environ.get("AIRFLOW_ENABLE_AIP_52") or "true"

from airflow import settings # noqa: E402
from airflow.models.tasklog import LogTemplate # noqa: E402
Expand Down
4 changes: 0 additions & 4 deletions tests/decorators/test_external_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import pytest

from airflow.decorators import setup, task, teardown
from airflow.settings import _ENABLE_AIP_52
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2016, 1, 1)
Expand Down Expand Up @@ -127,7 +126,6 @@ def f(_):

ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
def test_marking_external_python_task_as_setup(self, dag_maker, venv_python):
@setup
@task.external_python(python=venv_python)
Expand All @@ -142,7 +140,6 @@ def f():
assert setup_task._is_setup
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
def test_marking_external_python_task_as_teardown(self, dag_maker, venv_python):
@teardown
@task.external_python(python=venv_python)
Expand All @@ -157,7 +154,6 @@ def f():
assert teardown_task._is_teardown
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
def test_marking_external_python_task_as_teardown_with_on_failure_fail(
self, dag_maker, on_failure_fail_dagrun, venv_python
Expand Down
3 changes: 0 additions & 3 deletions tests/decorators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskmap import TaskMap
from airflow.models.xcom_arg import PlainXComArg, XComArg
from airflow.settings import _ENABLE_AIP_52
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup
Expand Down Expand Up @@ -858,7 +857,6 @@ def down(a: str):
assert result == uri


@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
def test_teardown_trigger_rule_selective_application(dag_maker, session):
with dag_maker(session=session) as dag:

Expand All @@ -884,7 +882,6 @@ def my_teardown():
assert teardown_task.operator.trigger_rule == TriggerRule.ALL_DONE_SETUP_SUCCESS


@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
def test_teardown_trigger_rule_override_behavior(dag_maker, session):
with dag_maker(session=session) as dag:

Expand Down
4 changes: 0 additions & 4 deletions tests/decorators/test_python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import pytest

from airflow.decorators import setup, task, teardown
from airflow.settings import _ENABLE_AIP_52
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2016, 1, 1)
Expand Down Expand Up @@ -178,7 +177,6 @@ def f(_):

ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
def test_marking_virtualenv_python_task_as_setup(self, dag_maker):
@setup
@task.virtualenv
Expand All @@ -193,7 +191,6 @@ def f():
assert setup_task._is_setup
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
def test_marking_virtualenv_python_task_as_teardown(self, dag_maker):
@teardown
@task.virtualenv
Expand All @@ -208,7 +205,6 @@ def f():
assert teardown_task._is_teardown
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
def test_marking_virtualenv_python_task_as_teardown_with_on_failure_fail(
self, dag_maker, on_failure_fail_dagrun
Expand Down
2 changes: 0 additions & 2 deletions tests/decorators/test_setup_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
from airflow import AirflowException
from airflow.decorators import setup, task, task_group, teardown
from airflow.operators.bash import BashOperator
from airflow.settings import _ENABLE_AIP_52


@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
class TestSetupTearDownTask:
def test_marking_functions_as_setup_task(self, dag_maker):
@setup
Expand Down
3 changes: 1 addition & 2 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
from airflow.sensors.base import BaseSensorOperator
from airflow.sensors.python import PythonSensor
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.settings import _ENABLE_AIP_52, TIMEZONE
from airflow.settings import TIMEZONE
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
Expand Down Expand Up @@ -1103,7 +1103,6 @@ def test_depends_on_past(self, dag_maker):
# of the trigger_rule under various circumstances
# Numeric fields are in order:
# successes, skipped, failed, upstream_failed, removed, done
@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
@pytest.mark.parametrize(
"trigger_rule, upstream_setups, upstream_states, flag_upstream_failed, expect_state, expect_passed",
[
Expand Down
Loading