Skip to content

Commit 91e1029

Browse files
authored
Disable start_from_trigger altogether for now (#54646)
1 parent 10dbbca commit 91e1029

2 files changed

Lines changed: 19 additions & 13 deletions

File tree

airflow-core/src/airflow/models/dagrun.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2004,18 +2004,23 @@ def schedule_tis(
20042004
and not task.inlets
20052005
):
20062006
empty_ti_ids.append(ti.id)
2007-
# check "start_trigger_args" to see whether the operator supports start execution from triggerer
2008-
# if so, we'll then check "start_from_trigger" to see whether this feature is turned on and defer
2009-
# this task.
2010-
# if not, we'll add this "ti" into "schedulable_ti_ids" and later execute it to run in the worker
2011-
elif task.start_trigger_args is not None:
2012-
if task.expand_start_from_trigger(context=ti.get_template_context()):
2013-
ti.start_date = timezone.utcnow()
2014-
if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
2015-
ti.try_number += 1
2016-
ti.defer_task(exception=None, session=session)
2017-
else:
2018-
schedulable_ti_ids.append(ti.id)
2007+
# Check "start_trigger_args" to see whether the operator supports
2008+
# start execution from triggerer. If so, we'll check "start_from_trigger"
2009+
# to see whether this feature is turned on and defer this task.
2010+
# If not, we'll add this "ti" into "schedulable_ti_ids" and later
2011+
# execute it to run in the worker.
2012+
# TODO TaskSDK: This is disabled since we haven't figured out how
2013+
# to render start_from_trigger in the scheduler. If we need to
2014+
# render the value in a worker, it kind of defeats the purpose of
2015+
# this feature (which is to save a worker process if possible).
2016+
# elif task.start_trigger_args is not None:
2017+
# if task.expand_start_from_trigger(context=ti.get_template_context()):
2018+
# ti.start_date = timezone.utcnow()
2019+
# if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
2020+
# ti.try_number += 1
2021+
# ti.defer_task(exception=None, session=session)
2022+
# else:
2023+
# schedulable_ti_ids.append(ti.id)
20192024
else:
20202025
schedulable_ti_ids.append(ti.id)
20212026

airflow-core/tests/unit/models/test_dagrun.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2036,6 +2036,7 @@ def test_schedule_tis_map_index(dag_maker, session):
20362036
assert ti2.state == TaskInstanceState.SUCCESS
20372037

20382038

2039+
@pytest.mark.xfail(reason="We can't keep this behaviour with remote workers where scheduler can't reach xcom")
20392040
@pytest.mark.need_serialized_dag
20402041
def test_schedule_tis_start_trigger(dag_maker, session):
20412042
"""
@@ -2092,7 +2093,7 @@ def test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
20922093
assert empty_ti.try_number == 1
20932094

20942095

2095-
@pytest.mark.xfail(reason="We can't keep this bevaviour with remote workers where scheduler can't reach xcom")
2096+
@pytest.mark.xfail(reason="We can't keep this behaviour with remote workers where scheduler can't reach xcom")
20962097
def test_schedule_tis_start_trigger_through_expand(dag_maker, session):
20972098
"""
20982099
Test that an operator with start_trigger_args set can be directly deferred during scheduling.

0 commit comments

Comments
 (0)