From d77f0563b403ae9e1a92e8e9e998a1142bb6f359 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 30 Sep 2022 17:03:07 +0100 Subject: [PATCH] Fix warning when using xcomarg dependencies (#26801) This warning was invisible before 2.4 due to a bug in our logging config (fixed by commit 7363e35) and AIP-45 which suddenly made this appear. The problem was being caused by set_xcomargs_dependencies being called once for each class in the hierarchy, and each of them doing the same logic. The fix is to look at the _actual_ function of `self.__init__` and compare it to the function we're about to call so that we don't set dependencies until we have finished the "outer" most class's apply_defaults invocation. --- airflow/models/baseoperator.py | 5 +++-- tests/decorators/test_python.py | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 640f4cac2dd0e..9d4489ade579a 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -410,8 +410,9 @@ def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> Any: # Store the args passed to init -- we need them to support task.map serialzation! self._BaseOperator__init_kwargs.update(kwargs) # type: ignore - if not instantiated_from_mapped: - # Set upstream task defined by XComArgs passed to template fields of the operator. + # Set upstream task defined by XComArgs passed to template fields of the operator. + # BUT: only do this _ONCE_, not once for each class in the hierarchy + if not instantiated_from_mapped and func == self.__init__.__wrapped__: # type: ignore[misc] self.set_xcomargs_dependencies() # Mark instance as instantiated. self._BaseOperator__instantiated = True diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 3dc151668231e..876c4b23d4acf 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -817,3 +817,18 @@ def down(a, b): assert len(decision.schedulable_tis) == 1 # "down" decision.schedulable_tis[0].run(session=session) assert result == "'example' None" + + +@pytest.mark.filterwarnings("error") +def test_no_warnings(reset_logging_config, caplog): + @task_decorator + def some_task(): + return 1 + + @task_decorator + def other(x): + ... + + with DAG(dag_id='test', start_date=DEFAULT_DATE, schedule=None): + other(some_task()) + assert caplog.messages == []