Skip to content

Commit

Permalink
Remove the leading underscore in some attrs in AIP-52 (#31383)
Browse files Browse the repository at this point in the history
* Remove the leading underscore in some attrs in AIP-52

These attrs are used in many places but they are private.
Since we can use :meta private: to hide attributes from documentation
we can safely document that these attrs are private and remove the leading underscores

* fixup! Remove the leading underscore in some attrs in AIP-52
  • Loading branch information
ephraimbuddy committed May 18, 2023
1 parent 33709f0 commit 955e949
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 79 deletions.
20 changes: 10 additions & 10 deletions airflow/decorators/base.py
Expand Up @@ -302,9 +302,9 @@ class _TaskDecorator(ExpandableFactory, Generic[FParams, FReturn, OperatorSubcla
decorator_name: str = attr.ib(repr=False, default="task")

_airflow_is_task_decorator: ClassVar[bool] = True
_is_setup: ClassVar[bool] = False
_is_teardown: ClassVar[bool] = False
_on_failure_fail_dagrun: ClassVar[bool] = False
is_setup: ClassVar[bool] = False
is_teardown: ClassVar[bool] = False
on_failure_fail_dagrun: ClassVar[bool] = False

@multiple_outputs.default
def _infer_multiple_outputs(self):
Expand Down Expand Up @@ -338,7 +338,7 @@ def __attrs_post_init__(self):
self.kwargs.setdefault("task_id", self.function.__name__)

def __call__(self, *args: FParams.args, **kwargs: FParams.kwargs) -> XComArg:
if self._is_teardown:
if self.is_teardown:
if "trigger_rule" in self.kwargs:
raise ValueError("Trigger rule not configurable for teardown tasks.")
self.kwargs.update(trigger_rule=TriggerRule.ALL_DONE_SETUP_SUCCESS)
Expand All @@ -349,9 +349,9 @@ def __call__(self, *args: FParams.args, **kwargs: FParams.kwargs) -> XComArg:
multiple_outputs=self.multiple_outputs,
**self.kwargs,
)
op._is_setup = self._is_setup
op._is_teardown = self._is_teardown
op._on_failure_fail_dagrun = self._on_failure_fail_dagrun
op.is_setup = self.is_setup
op.is_teardown = self.is_teardown
op.on_failure_fail_dagrun = self.on_failure_fail_dagrun
op_doc_attrs = [op.doc, op.doc_json, op.doc_md, op.doc_rst, op.doc_yaml]
# Set the task's doc_md to the function's docstring if it exists and no other doc* args are set.
if self.function.__doc__ and not any(op_doc_attrs):
Expand Down Expand Up @@ -485,9 +485,9 @@ def partial(self, **kwargs: Any) -> _TaskDecorator[FParams, FReturn, OperatorSub

def override(self, **kwargs: Any) -> _TaskDecorator[FParams, FReturn, OperatorSubclass]:
result = attr.evolve(self, kwargs={**self.kwargs, **kwargs})
setattr(result, "_is_setup", self._is_setup)
setattr(result, "_is_teardown", self._is_teardown)
setattr(result, "_on_failure_fail_dagrun", self._on_failure_fail_dagrun)
setattr(result, "is_setup", self.is_setup)
setattr(result, "is_teardown", self.is_teardown)
setattr(result, "on_failure_fail_dagrun", self.on_failure_fail_dagrun)
return result


Expand Down
6 changes: 3 additions & 3 deletions airflow/decorators/setup_teardown.py
Expand Up @@ -30,7 +30,7 @@ def setup_task(func: Callable) -> Callable:
func = python_task(func)
if isinstance(func, _TaskGroupFactory):
raise AirflowException("Task groups cannot be marked as setup or teardown.")
func._is_setup = True # type: ignore[attr-defined]
func.is_setup = True # type: ignore[attr-defined]
return func


Expand All @@ -41,8 +41,8 @@ def teardown(func: Callable) -> Callable:
func = python_task(func)
if isinstance(func, _TaskGroupFactory):
raise AirflowException("Task groups cannot be marked as setup or teardown.")
func._is_teardown = True # type: ignore[attr-defined]
func._on_failure_fail_dagrun = on_failure_fail_dagrun # type: ignore[attr-defined]
func.is_teardown = True # type: ignore[attr-defined]
func.on_failure_fail_dagrun = on_failure_fail_dagrun # type: ignore[attr-defined]
return func

if _func is None:
Expand Down
35 changes: 25 additions & 10 deletions airflow/models/baseoperator.py
Expand Up @@ -720,9 +720,24 @@ class derived from this one results in the creation of a task object,
# Set to True for an operator instantiated by a mapped operator.
__from_mapped = False

_is_setup = False
_is_teardown = False
_on_failure_fail_dagrun = False
is_setup = False
"""
Whether the operator is a setup task
:meta private:
"""
is_teardown = False
"""
Whether the operator is a teardown task
:meta private:
"""
on_failure_fail_dagrun = False
"""
Whether the operator should fail the dagrun on failure
:meta private:
"""

def __init__(
self,
Expand Down Expand Up @@ -963,7 +978,7 @@ def __init__(
@classmethod
def as_setup(cls, *args, **kwargs):
op = cls(*args, **kwargs)
op._is_setup = True
op.is_setup = True
return op

@classmethod
Expand All @@ -972,12 +987,12 @@ def as_teardown(cls, *args, **kwargs):
if "trigger_rule" in kwargs:
raise ValueError("Cannot set trigger rule for teardown tasks.")
op = cls(*args, **kwargs, trigger_rule=TriggerRule.ALL_DONE_SETUP_SUCCESS)
op._is_teardown = True
op._on_failure_fail_dagrun = on_failure_fail_dagrun
op.is_teardown = True
op.on_failure_fail_dagrun = on_failure_fail_dagrun
return op

def __enter__(self):
if not self._is_setup and not self._is_teardown:
if not self.is_setup and not self.is_teardown:
raise AirflowException("Only setup/teardown tasks can be used as context managers.")
SetupTeardownContext.push_setup_teardown_task(self)
return self
Expand Down Expand Up @@ -1534,9 +1549,9 @@ def get_serialized_fields(cls):
"template_fields",
"template_fields_renderers",
"params",
"_is_setup",
"_is_teardown",
"_on_failure_fail_dagrun",
"is_setup",
"is_teardown",
"on_failure_fail_dagrun",
}
)
DagContext.pop_context_managed_dag()
Expand Down
6 changes: 2 additions & 4 deletions airflow/models/dag.py
Expand Up @@ -1218,14 +1218,12 @@ def task_ids(self) -> list[str]:

@property
def teardowns(self) -> list[Operator]:
return [task for task in self.tasks if getattr(task, "_is_teardown", None)]
return [task for task in self.tasks if getattr(task, "is_teardown", None)]

@property
def tasks_upstream_of_teardowns(self) -> list[Operator]:
upstream_tasks = [t.upstream_list for t in self.teardowns]
return [
val for sublist in upstream_tasks for val in sublist if not getattr(val, "_is_teardown", None)
]
return [val for sublist in upstream_tasks for val in sublist if not getattr(val, "is_teardown", None)]

@property
def task_group(self) -> TaskGroup:
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Expand Up @@ -603,7 +603,7 @@ def recalculate(self) -> _UnfinishedStates:
teardown_task_ids = [t.task_id for t in dag.teardowns]
upstream_of_teardowns = [t.task_id for t in dag.tasks_upstream_of_teardowns]
teardown_tis = [ti for ti in tis if ti.task_id in teardown_task_ids]
on_failure_fail_tis = [ti for ti in teardown_tis if getattr(ti.task, "_on_failure_fail_dagrun")]
on_failure_fail_tis = [ti for ti in teardown_tis if getattr(ti.task, "on_failure_fail_dagrun")]
tis_upstream_of_teardowns = [ti for ti in tis if ti.task_id in upstream_of_teardowns]
leaf_tis = list(set(leaf_tis) - set(teardown_tis))
leaf_tis.extend(on_failure_fail_tis)
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/xcom_arg.py
Expand Up @@ -206,7 +206,7 @@ def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
raise NotImplementedError()

def __enter__(self):
if not self.operator._is_setup and not self.operator._is_teardown:
if not self.operator.is_setup and not self.operator.is_teardown:
raise AirflowException("Only setup/teardown tasks can be used as context managers.")
SetupTeardownContext.push_setup_teardown_task(self.operator)
return self
Expand Down
4 changes: 2 additions & 2 deletions airflow/ti_deps/deps/trigger_rule_dep.py
Expand Up @@ -69,7 +69,7 @@ def calculate(cls, finished_upstreams: Iterator[TaskInstance]) -> _UpstreamTISta
curr_state = {ti.state: 1}
counter.update(curr_state)
# setup task cannot be mapped
if not isinstance(ti.task, MappedOperator) and ti.task._is_setup:
if not isinstance(ti.task, MappedOperator) and ti.task.is_setup:
setup_counter.update(curr_state)
return _UpstreamTIStates(
success=counter.get(TaskInstanceState.SUCCESS, 0),
Expand Down Expand Up @@ -231,7 +231,7 @@ def _iter_upstream_conditions() -> Iterator[ColumnOperators]:
if not any(needs_expansion(t) for t in upstream_tasks.values()):
upstream = len(upstream_tasks)
upstream_setup = len(
[x for x in upstream_tasks.values() if not isinstance(x, MappedOperator) and x._is_setup]
[x for x in upstream_tasks.values() if not isinstance(x, MappedOperator) and x.is_setup]
)
else:
upstream = (
Expand Down
8 changes: 4 additions & 4 deletions airflow/utils/setup_teardown.py
Expand Up @@ -93,14 +93,14 @@ def get_context_managed_teardown_task(cls) -> Operator | None:

@classmethod
def push_setup_teardown_task(cls, operator):
if operator._is_teardown:
if operator.is_teardown:
SetupTeardownContext.push_context_managed_teardown_task(operator)
upstream_setup = [task for task in operator.upstream_list if task._is_setup]
upstream_setup = [task for task in operator.upstream_list if task.is_setup]
if upstream_setup:
SetupTeardownContext.push_context_managed_setup_task(upstream_setup[-1])
elif operator._is_setup:
elif operator.is_setup:
SetupTeardownContext.push_context_managed_setup_task(operator)
downstream_teardown = [task for task in operator.downstream_list if task._is_teardown]
downstream_teardown = [task for task in operator.downstream_list if task.is_teardown]
if downstream_teardown:
SetupTeardownContext.push_context_managed_teardown_task(downstream_teardown[0])
SetupTeardownContext.active = True
Expand Down
8 changes: 4 additions & 4 deletions tests/decorators/test_external_python.py
Expand Up @@ -137,7 +137,7 @@ def f():

assert len(dag.task_group.children) == 1
setup_task = dag.task_group.children["f"]
assert setup_task._is_setup
assert setup_task.is_setup
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

def test_marking_external_python_task_as_teardown(self, dag_maker, venv_python):
Expand All @@ -151,7 +151,7 @@ def f():

assert len(dag.task_group.children) == 1
teardown_task = dag.task_group.children["f"]
assert teardown_task._is_teardown
assert teardown_task.is_teardown
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
Expand All @@ -168,6 +168,6 @@ def f():

assert len(dag.task_group.children) == 1
teardown_task = dag.task_group.children["f"]
assert teardown_task._is_teardown
assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun
assert teardown_task.is_teardown
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
8 changes: 4 additions & 4 deletions tests/decorators/test_python_virtualenv.py
Expand Up @@ -188,7 +188,7 @@ def f():

assert len(dag.task_group.children) == 1
setup_task = dag.task_group.children["f"]
assert setup_task._is_setup
assert setup_task.is_setup
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

def test_marking_virtualenv_python_task_as_teardown(self, dag_maker):
Expand All @@ -202,7 +202,7 @@ def f():

assert len(dag.task_group.children) == 1
teardown_task = dag.task_group.children["f"]
assert teardown_task._is_teardown
assert teardown_task.is_teardown
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
Expand All @@ -219,6 +219,6 @@ def f():

assert len(dag.task_group.children) == 1
teardown_task = dag.task_group.children["f"]
assert teardown_task._is_teardown
assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun
assert teardown_task.is_teardown
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
38 changes: 19 additions & 19 deletions tests/decorators/test_setup_teardown.py
Expand Up @@ -35,7 +35,7 @@ def mytask():

assert len(dag.task_group.children) == 1
setup_task = dag.task_group.children["mytask"]
assert setup_task._is_setup
assert setup_task.is_setup

def test_marking_functions_as_teardown_task(self, dag_maker):
@teardown
Expand All @@ -47,7 +47,7 @@ def mytask():

assert len(dag.task_group.children) == 1
teardown_task = dag.task_group.children["mytask"]
assert teardown_task._is_teardown
assert teardown_task.is_teardown

def test_marking_decorated_functions_as_setup_task(self, dag_maker):
@setup
Expand All @@ -60,15 +60,15 @@ def mytask():

assert len(dag.task_group.children) == 1
setup_task = dag.task_group.children["mytask"]
assert setup_task._is_setup
assert setup_task.is_setup

def test_marking_operator_as_setup_task(self, dag_maker):
with dag_maker() as dag:
BashOperator.as_setup(task_id="mytask", bash_command='echo "I am a setup task"')

assert len(dag.task_group.children) == 1
setup_task = dag.task_group.children["mytask"]
assert setup_task._is_setup
assert setup_task.is_setup

def test_marking_decorated_functions_as_teardown_task(self, dag_maker):
@teardown
Expand All @@ -81,15 +81,15 @@ def mytask():

assert len(dag.task_group.children) == 1
teardown_task = dag.task_group.children["mytask"]
assert teardown_task._is_teardown
assert teardown_task.is_teardown

def test_marking_operator_as_teardown_task(self, dag_maker):
with dag_maker() as dag:
BashOperator.as_teardown(task_id="mytask", bash_command='echo "I am a setup task"')

assert len(dag.task_group.children) == 1
teardown_task = dag.task_group.children["mytask"]
assert teardown_task._is_teardown
assert teardown_task.is_teardown

def test_setup_taskgroup_decorator(self, dag_maker):
with dag_maker():
Expand Down Expand Up @@ -138,8 +138,8 @@ def mytask():
with dag_maker() as dag:
mytask()
teardown_task = dag.task_group.children["mytask"]
assert teardown_task._is_teardown
assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun
assert teardown_task.is_teardown
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
assert len(dag.task_group.children) == 1

@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
Expand All @@ -152,8 +152,8 @@ def test_classic_teardown_task_works_with_on_failure_fail_dagrun(self, on_failur
)

teardown_task = dag.task_group.children["mytask"]
assert teardown_task._is_teardown
assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun
assert teardown_task.is_teardown
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
assert len(dag.task_group.children) == 1

def test_setup_task_can_be_overriden(self, dag_maker):
Expand All @@ -165,7 +165,7 @@ def mytask():
mytask.override(task_id="mytask2")()
assert len(dag.task_group.children) == 1
setup_task = dag.task_group.children["mytask2"]
assert setup_task._is_setup
assert setup_task.is_setup

def test_setup_teardown_mixed_up_in_a_dag(self, dag_maker):
@setup
Expand Down Expand Up @@ -202,14 +202,14 @@ def mytask2():

assert len(dag.task_group.children) == 6
assert [x for x in dag.tasks if not x.downstream_list] # no deps have been set
assert dag.task_group.children["setuptask"]._is_setup
assert dag.task_group.children["teardowntask"]._is_teardown
assert dag.task_group.children["setuptask2"]._is_setup
assert dag.task_group.children["teardowntask2"]._is_teardown
assert dag.task_group.children["mytask"]._is_setup is False
assert dag.task_group.children["mytask"]._is_teardown is False
assert dag.task_group.children["mytask2"]._is_setup is False
assert dag.task_group.children["mytask2"]._is_teardown is False
assert dag.task_group.children["setuptask"].is_setup
assert dag.task_group.children["teardowntask"].is_teardown
assert dag.task_group.children["setuptask2"].is_setup
assert dag.task_group.children["teardowntask2"].is_teardown
assert dag.task_group.children["mytask"].is_setup is False
assert dag.task_group.children["mytask"].is_teardown is False
assert dag.task_group.children["mytask2"].is_setup is False
assert dag.task_group.children["mytask2"].is_teardown is False

def test_setup_teardown_as_context_manager_normal_tasks_rel_set_downstream(self, dag_maker):
"""
Expand Down
4 changes: 2 additions & 2 deletions tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
Expand Up @@ -185,7 +185,7 @@ def f():

assert len(dag.task_group.children) == 1
setup_task = dag.task_group.children["f"]
assert setup_task._is_setup
assert setup_task.is_setup


def test_kubernetes_with_marked_as_teardown(
Expand All @@ -207,7 +207,7 @@ def f():

assert len(dag.task_group.children) == 1
teardown_task = dag.task_group.children["f"]
assert teardown_task._is_teardown
assert teardown_task.is_teardown


def test_kubernetes_with_mini_scheduler(
Expand Down

0 comments on commit 955e949

Please sign in to comment.