diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 3a95a3945c44d..8bc96df735257 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -410,8 +410,9 @@ def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> Any: # Store the args passed to init -- we need them to support task.map serialzation! self._BaseOperator__init_kwargs.update(kwargs) # type: ignore - if not instantiated_from_mapped: - # Set upstream task defined by XComArgs passed to template fields of the operator. + # Set upstream task defined by XComArgs passed to template fields of the operator. + # BUT: only do this _ONCE_, not once for each class in the hierarchy + if not instantiated_from_mapped and func == self.__init__.__wrapped__: # type: ignore[misc] self.set_xcomargs_dependencies() # Mark instance as instantiated. self._BaseOperator__instantiated = True diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 3dc151668231e..876c4b23d4acf 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -817,3 +817,18 @@ def down(a, b): assert len(decision.schedulable_tis) == 1 # "down" decision.schedulable_tis[0].run(session=session) assert result == "'example' None" + + +@pytest.mark.filterwarnings("error") +def test_no_warnings(reset_logging_config, caplog): + @task_decorator + def some_task(): + return 1 + + @task_decorator + def other(x): + ... + + with DAG(dag_id='test', start_date=DEFAULT_DATE, schedule=None): + other(some_task()) + assert caplog.messages == []