From 955e9492cf78bf640fef4f30f480f11ca1e5d43f Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 18 May 2023 18:49:38 +0100 Subject: [PATCH] Remove the leading underscore in some attrs in AIP-52 (#31383) * Remove the leading underscore in some attrs in AIP-52 These attrs are used in many places but they are private. Since we can use :meta private: to hide attributes from documentation we can safely document that these attrs are private and remove the leading underscores * fixup! Remove the leading underscore in some attrs in AIP-52 --- airflow/decorators/base.py | 20 +++++----- airflow/decorators/setup_teardown.py | 6 +-- airflow/models/baseoperator.py | 35 ++++++++++++----- airflow/models/dag.py | 6 +-- airflow/models/dagrun.py | 2 +- airflow/models/xcom_arg.py | 2 +- airflow/ti_deps/deps/trigger_rule_dep.py | 4 +- airflow/utils/setup_teardown.py | 8 ++-- tests/decorators/test_external_python.py | 8 ++-- tests/decorators/test_python_virtualenv.py | 8 ++-- tests/decorators/test_setup_teardown.py | 38 +++++++++---------- .../kubernetes/decorators/test_kubernetes.py | 4 +- .../docker/decorators/test_docker.py | 8 ++-- tests/serialization/test_dag_serialization.py | 22 +++++------ 14 files changed, 92 insertions(+), 79 deletions(-) diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py index c5d17122502ac..e79f13af37d8d 100644 --- a/airflow/decorators/base.py +++ b/airflow/decorators/base.py @@ -302,9 +302,9 @@ class _TaskDecorator(ExpandableFactory, Generic[FParams, FReturn, OperatorSubcla decorator_name: str = attr.ib(repr=False, default="task") _airflow_is_task_decorator: ClassVar[bool] = True - _is_setup: ClassVar[bool] = False - _is_teardown: ClassVar[bool] = False - _on_failure_fail_dagrun: ClassVar[bool] = False + is_setup: ClassVar[bool] = False + is_teardown: ClassVar[bool] = False + on_failure_fail_dagrun: ClassVar[bool] = False @multiple_outputs.default def _infer_multiple_outputs(self): @@ -338,7 +338,7 @@ def __attrs_post_init__(self): self.kwargs.setdefault("task_id", self.function.__name__) def __call__(self, *args: FParams.args, **kwargs: FParams.kwargs) -> XComArg: - if self._is_teardown: + if self.is_teardown: if "trigger_rule" in self.kwargs: raise ValueError("Trigger rule not configurable for teardown tasks.") self.kwargs.update(trigger_rule=TriggerRule.ALL_DONE_SETUP_SUCCESS) @@ -349,9 +349,9 @@ def __call__(self, *args: FParams.args, **kwargs: FParams.kwargs) -> XComArg: multiple_outputs=self.multiple_outputs, **self.kwargs, ) - op._is_setup = self._is_setup - op._is_teardown = self._is_teardown - op._on_failure_fail_dagrun = self._on_failure_fail_dagrun + op.is_setup = self.is_setup + op.is_teardown = self.is_teardown + op.on_failure_fail_dagrun = self.on_failure_fail_dagrun op_doc_attrs = [op.doc, op.doc_json, op.doc_md, op.doc_rst, op.doc_yaml] # Set the task's doc_md to the function's docstring if it exists and no other doc* args are set. if self.function.__doc__ and not any(op_doc_attrs): @@ -485,9 +485,9 @@ def partial(self, **kwargs: Any) -> _TaskDecorator[FParams, FReturn, OperatorSub def override(self, **kwargs: Any) -> _TaskDecorator[FParams, FReturn, OperatorSubclass]: result = attr.evolve(self, kwargs={**self.kwargs, **kwargs}) - setattr(result, "_is_setup", self._is_setup) - setattr(result, "_is_teardown", self._is_teardown) - setattr(result, "_on_failure_fail_dagrun", self._on_failure_fail_dagrun) + setattr(result, "is_setup", self.is_setup) + setattr(result, "is_teardown", self.is_teardown) + setattr(result, "on_failure_fail_dagrun", self.on_failure_fail_dagrun) return result diff --git a/airflow/decorators/setup_teardown.py b/airflow/decorators/setup_teardown.py index 8fccdb9fe5800..17c87c70259b8 100644 --- a/airflow/decorators/setup_teardown.py +++ b/airflow/decorators/setup_teardown.py @@ -30,7 +30,7 @@ def setup_task(func: Callable) -> Callable: func = python_task(func) if isinstance(func, _TaskGroupFactory): raise AirflowException("Task groups cannot be marked as setup or teardown.") - func._is_setup = True # type: ignore[attr-defined] + func.is_setup = True # type: ignore[attr-defined] return func @@ -41,8 +41,8 @@ def teardown(func: Callable) -> Callable: func = python_task(func) if isinstance(func, _TaskGroupFactory): raise AirflowException("Task groups cannot be marked as setup or teardown.") - func._is_teardown = True # type: ignore[attr-defined] - func._on_failure_fail_dagrun = on_failure_fail_dagrun # type: ignore[attr-defined] + func.is_teardown = True # type: ignore[attr-defined] + func.on_failure_fail_dagrun = on_failure_fail_dagrun # type: ignore[attr-defined] return func if _func is None: diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index c4510cd43966b..6392ef9697647 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -720,9 +720,24 @@ class derived from this one results in the creation of a task object, # Set to True for an operator instantiated by a mapped operator. __from_mapped = False - _is_setup = False - _is_teardown = False - _on_failure_fail_dagrun = False + is_setup = False + """ + Whether the operator is a setup task + + :meta private: + """ + is_teardown = False + """ + Whether the operator is a teardown task + + :meta private: + """ + on_failure_fail_dagrun = False + """ + Whether the operator should fail the dagrun on failure + + :meta private: + """ def __init__( self, @@ -963,7 +978,7 @@ def __init__( @classmethod def as_setup(cls, *args, **kwargs): op = cls(*args, **kwargs) - op._is_setup = True + op.is_setup = True return op @classmethod @@ -972,12 +987,12 @@ def as_teardown(cls, *args, **kwargs): if "trigger_rule" in kwargs: raise ValueError("Cannot set trigger rule for teardown tasks.") op = cls(*args, **kwargs, trigger_rule=TriggerRule.ALL_DONE_SETUP_SUCCESS) - op._is_teardown = True - op._on_failure_fail_dagrun = on_failure_fail_dagrun + op.is_teardown = True + op.on_failure_fail_dagrun = on_failure_fail_dagrun return op def __enter__(self): - if not self._is_setup and not self._is_teardown: + if not self.is_setup and not self.is_teardown: raise AirflowException("Only setup/teardown tasks can be used as context managers.") SetupTeardownContext.push_setup_teardown_task(self) return self @@ -1534,9 +1549,9 @@ def get_serialized_fields(cls): "template_fields", "template_fields_renderers", "params", - "_is_setup", - "_is_teardown", - "_on_failure_fail_dagrun", + "is_setup", + "is_teardown", + "on_failure_fail_dagrun", } ) DagContext.pop_context_managed_dag() diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 56e114310133a..b536510f7c441 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1218,14 +1218,12 @@ def task_ids(self) -> list[str]: @property def teardowns(self) -> list[Operator]: - return [task for task in self.tasks if getattr(task, "_is_teardown", None)] + return [task for task in self.tasks if getattr(task, "is_teardown", None)] @property def tasks_upstream_of_teardowns(self) -> list[Operator]: upstream_tasks = [t.upstream_list for t in self.teardowns] - return [ - val for sublist in upstream_tasks for val in sublist if not getattr(val, "_is_teardown", None) - ] + return [val for sublist in upstream_tasks for val in sublist if not getattr(val, "is_teardown", None)] @property def task_group(self) -> TaskGroup: diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 98810693cc1a1..5647b3410d21e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -603,7 +603,7 @@ def recalculate(self) -> _UnfinishedStates: teardown_task_ids = [t.task_id for t in dag.teardowns] upstream_of_teardowns = [t.task_id for t in dag.tasks_upstream_of_teardowns] teardown_tis = [ti for ti in tis if ti.task_id in teardown_task_ids] - on_failure_fail_tis = [ti for ti in teardown_tis if getattr(ti.task, "_on_failure_fail_dagrun")] + on_failure_fail_tis = [ti for ti in teardown_tis if getattr(ti.task, "on_failure_fail_dagrun")] tis_upstream_of_teardowns = [ti for ti in tis if ti.task_id in upstream_of_teardowns] leaf_tis = list(set(leaf_tis) - set(teardown_tis)) leaf_tis.extend(on_failure_fail_tis) diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py index 85dbce98d4ea9..224e633172d22 100644 --- a/airflow/models/xcom_arg.py +++ b/airflow/models/xcom_arg.py @@ -206,7 +206,7 @@ def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any: raise NotImplementedError() def __enter__(self): - if not self.operator._is_setup and not self.operator._is_teardown: + if not self.operator.is_setup and not self.operator.is_teardown: raise AirflowException("Only setup/teardown tasks can be used as context managers.") SetupTeardownContext.push_setup_teardown_task(self.operator) return self diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 3c11050bfc3e1..62588c15b5f0f 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -69,7 +69,7 @@ def calculate(cls, finished_upstreams: Iterator[TaskInstance]) -> _UpstreamTISta curr_state = {ti.state: 1} counter.update(curr_state) # setup task cannot be mapped - if not isinstance(ti.task, MappedOperator) and ti.task._is_setup: + if not isinstance(ti.task, MappedOperator) and ti.task.is_setup: setup_counter.update(curr_state) return _UpstreamTIStates( success=counter.get(TaskInstanceState.SUCCESS, 0), @@ -231,7 +231,7 @@ def _iter_upstream_conditions() -> Iterator[ColumnOperators]: if not any(needs_expansion(t) for t in upstream_tasks.values()): upstream = len(upstream_tasks) upstream_setup = len( - [x for x in upstream_tasks.values() if not isinstance(x, MappedOperator) and x._is_setup] + [x for x in upstream_tasks.values() if not isinstance(x, MappedOperator) and x.is_setup] ) else: upstream = ( diff --git a/airflow/utils/setup_teardown.py b/airflow/utils/setup_teardown.py index 32fad8014e71e..696bb5ba9b016 100644 --- a/airflow/utils/setup_teardown.py +++ b/airflow/utils/setup_teardown.py @@ -93,14 +93,14 @@ def get_context_managed_teardown_task(cls) -> Operator | None: @classmethod def push_setup_teardown_task(cls, operator): - if operator._is_teardown: + if operator.is_teardown: SetupTeardownContext.push_context_managed_teardown_task(operator) - upstream_setup = [task for task in operator.upstream_list if task._is_setup] + upstream_setup = [task for task in operator.upstream_list if task.is_setup] if upstream_setup: SetupTeardownContext.push_context_managed_setup_task(upstream_setup[-1]) - elif operator._is_setup: + elif operator.is_setup: SetupTeardownContext.push_context_managed_setup_task(operator) - downstream_teardown = [task for task in operator.downstream_list if task._is_teardown] + downstream_teardown = [task for task in operator.downstream_list if task.is_teardown] if downstream_teardown: SetupTeardownContext.push_context_managed_teardown_task(downstream_teardown[0]) SetupTeardownContext.active = True diff --git a/tests/decorators/test_external_python.py b/tests/decorators/test_external_python.py index 84c46127c1ece..80a952e679890 100644 --- a/tests/decorators/test_external_python.py +++ b/tests/decorators/test_external_python.py @@ -137,7 +137,7 @@ def f(): assert len(dag.task_group.children) == 1 setup_task = dag.task_group.children["f"] - assert setup_task._is_setup + assert setup_task.is_setup ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) def test_marking_external_python_task_as_teardown(self, dag_maker, venv_python): @@ -151,7 +151,7 @@ def f(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["f"] - assert teardown_task._is_teardown + assert teardown_task.is_teardown ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) @pytest.mark.parametrize("on_failure_fail_dagrun", [True, False]) @@ -168,6 +168,6 @@ def f(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["f"] - assert teardown_task._is_teardown - assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun + assert teardown_task.is_teardown + assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) diff --git a/tests/decorators/test_python_virtualenv.py b/tests/decorators/test_python_virtualenv.py index ec37c3586ef01..96b76236561a4 100644 --- a/tests/decorators/test_python_virtualenv.py +++ b/tests/decorators/test_python_virtualenv.py @@ -188,7 +188,7 @@ def f(): assert len(dag.task_group.children) == 1 setup_task = dag.task_group.children["f"] - assert setup_task._is_setup + assert setup_task.is_setup ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) def test_marking_virtualenv_python_task_as_teardown(self, dag_maker): @@ -202,7 +202,7 @@ def f(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["f"] - assert teardown_task._is_teardown + assert teardown_task.is_teardown ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) @pytest.mark.parametrize("on_failure_fail_dagrun", [True, False]) @@ -219,6 +219,6 @@ def f(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["f"] - assert teardown_task._is_teardown - assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun + assert teardown_task.is_teardown + assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) diff --git a/tests/decorators/test_setup_teardown.py b/tests/decorators/test_setup_teardown.py index 8de3f625d4159..082b5e78134b6 100644 --- a/tests/decorators/test_setup_teardown.py +++ b/tests/decorators/test_setup_teardown.py @@ -35,7 +35,7 @@ def mytask(): assert len(dag.task_group.children) == 1 setup_task = dag.task_group.children["mytask"] - assert setup_task._is_setup + assert setup_task.is_setup def test_marking_functions_as_teardown_task(self, dag_maker): @teardown @@ -47,7 +47,7 @@ def mytask(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["mytask"] - assert teardown_task._is_teardown + assert teardown_task.is_teardown def test_marking_decorated_functions_as_setup_task(self, dag_maker): @setup @@ -60,7 +60,7 @@ def mytask(): assert len(dag.task_group.children) == 1 setup_task = dag.task_group.children["mytask"] - assert setup_task._is_setup + assert setup_task.is_setup def test_marking_operator_as_setup_task(self, dag_maker): with dag_maker() as dag: @@ -68,7 +68,7 @@ def test_marking_operator_as_setup_task(self, dag_maker): assert len(dag.task_group.children) == 1 setup_task = dag.task_group.children["mytask"] - assert setup_task._is_setup + assert setup_task.is_setup def test_marking_decorated_functions_as_teardown_task(self, dag_maker): @teardown @@ -81,7 +81,7 @@ def mytask(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["mytask"] - assert teardown_task._is_teardown + assert teardown_task.is_teardown def test_marking_operator_as_teardown_task(self, dag_maker): with dag_maker() as dag: @@ -89,7 +89,7 @@ def test_marking_operator_as_teardown_task(self, dag_maker): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["mytask"] - assert teardown_task._is_teardown + assert teardown_task.is_teardown def test_setup_taskgroup_decorator(self, dag_maker): with dag_maker(): @@ -138,8 +138,8 @@ def mytask(): with dag_maker() as dag: mytask() teardown_task = dag.task_group.children["mytask"] - assert teardown_task._is_teardown - assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun + assert teardown_task.is_teardown + assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun assert len(dag.task_group.children) == 1 @pytest.mark.parametrize("on_failure_fail_dagrun", [True, False]) @@ -152,8 +152,8 @@ def test_classic_teardown_task_works_with_on_failure_fail_dagrun(self, on_failur ) teardown_task = dag.task_group.children["mytask"] - assert teardown_task._is_teardown - assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun + assert teardown_task.is_teardown + assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun assert len(dag.task_group.children) == 1 def test_setup_task_can_be_overriden(self, dag_maker): @@ -165,7 +165,7 @@ def mytask(): mytask.override(task_id="mytask2")() assert len(dag.task_group.children) == 1 setup_task = dag.task_group.children["mytask2"] - assert setup_task._is_setup + assert setup_task.is_setup def test_setup_teardown_mixed_up_in_a_dag(self, dag_maker): @setup @@ -202,14 +202,14 @@ def mytask2(): assert len(dag.task_group.children) == 6 assert [x for x in dag.tasks if not x.downstream_list] # no deps have been set - assert dag.task_group.children["setuptask"]._is_setup - assert dag.task_group.children["teardowntask"]._is_teardown - assert dag.task_group.children["setuptask2"]._is_setup - assert dag.task_group.children["teardowntask2"]._is_teardown - assert dag.task_group.children["mytask"]._is_setup is False - assert dag.task_group.children["mytask"]._is_teardown is False - assert dag.task_group.children["mytask2"]._is_setup is False - assert dag.task_group.children["mytask2"]._is_teardown is False + assert dag.task_group.children["setuptask"].is_setup + assert dag.task_group.children["teardowntask"].is_teardown + assert dag.task_group.children["setuptask2"].is_setup + assert dag.task_group.children["teardowntask2"].is_teardown + assert dag.task_group.children["mytask"].is_setup is False + assert dag.task_group.children["mytask"].is_teardown is False + assert dag.task_group.children["mytask2"].is_setup is False + assert dag.task_group.children["mytask2"].is_teardown is False def test_setup_teardown_as_context_manager_normal_tasks_rel_set_downstream(self, dag_maker): """ diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py index 0cc9a72c12afb..b7f79279d4215 100644 --- a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py +++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py @@ -185,7 +185,7 @@ def f(): assert len(dag.task_group.children) == 1 setup_task = dag.task_group.children["f"] - assert setup_task._is_setup + assert setup_task.is_setup def test_kubernetes_with_marked_as_teardown( @@ -207,7 +207,7 @@ def f(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["f"] - assert teardown_task._is_teardown + assert teardown_task.is_teardown def test_kubernetes_with_mini_scheduler( diff --git a/tests/providers/docker/decorators/test_docker.py b/tests/providers/docker/decorators/test_docker.py index 2ed7068a52724..49892c7aa4af3 100644 --- a/tests/providers/docker/decorators/test_docker.py +++ b/tests/providers/docker/decorators/test_docker.py @@ -156,7 +156,7 @@ def f(): assert len(dag.task_group.children) == 1 setup_task = dag.task_group.children["f"] - assert setup_task._is_setup + assert setup_task.is_setup def test_teardown_decorator_with_decorated_docker_task(self, dag_maker): @teardown @@ -169,7 +169,7 @@ def f(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["f"] - assert teardown_task._is_teardown + assert teardown_task.is_teardown @pytest.mark.parametrize("on_failure_fail_dagrun", [True, False]) def test_teardown_decorator_with_decorated_docker_task_and_on_failure_fail_arg( @@ -185,5 +185,5 @@ def f(): assert len(dag.task_group.children) == 1 teardown_task = dag.task_group.children["f"] - assert teardown_task._is_teardown - assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun + assert teardown_task.is_teardown + assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 0fd2e4b427c63..71d6c82c68143 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -162,9 +162,9 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "_task_type": "BashOperator", "_task_module": "airflow.operators.bash", "pool": "default_pool", - "_is_setup": False, - "_is_teardown": False, - "_on_failure_fail_dagrun": False, + "is_setup": False, + "is_teardown": False, + "on_failure_fail_dagrun": False, "executor_config": { "__type": "dict", "__var": { @@ -194,9 +194,9 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "_operator_name": "@custom", "_task_module": "tests.test_utils.mock_operators", "pool": "default_pool", - "_is_setup": False, - "_is_teardown": False, - "_on_failure_fail_dagrun": False, + "is_setup": False, + "is_teardown": False, + "on_failure_fail_dagrun": False, }, ], "schedule_interval": {"__type": "timedelta", "__var": 86400.0}, @@ -570,7 +570,7 @@ def validate_deserialized_task( "on_retry_callback", # Checked separately "resources", - "_on_failure_fail_dagrun", + "on_failure_fail_dagrun", } else: # Promised to be mapped by the assert above. assert isinstance(serialized_task, MappedOperator) @@ -1316,8 +1316,8 @@ def assert_taskgroup_children(se_task_group, dag_task_group, expected_children): @staticmethod def assert_task_is_setup_teardown(task, is_setup: bool = False, is_teardown: bool = False): - assert task._is_setup == is_setup - assert task._is_teardown == is_teardown + assert task.is_setup == is_setup + assert task.is_teardown == is_teardown def test_setup_teardown_tasks(self): """ @@ -1392,8 +1392,8 @@ def mytask(): serialized_dag = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(dag)) task = serialized_dag.task_group.children["mytask"] - assert task._is_teardown - assert task._on_failure_fail_dagrun + assert task.is_teardown + assert task.on_failure_fail_dagrun def test_deps_sorted(self): """