Skip to content
Permalink
Browse files
Fix scheduler crash when expanding with mapped task that returned none (
#23486)

When task is expanded from a mapped task that returned no value, it
crashes the scheduler. This PR fixes it by first checking if there's
a return value from the mapped task, if no returned value, then error
in the task itself instead of crashing the scheduler
  • Loading branch information
ephraimbuddy committed May 9, 2022
1 parent 4485393 commit 7813f996ab79b9e6ef07090194f1e621e4f90e17
Showing 2 changed files with 21 additions and 1 deletion.
@@ -2329,10 +2329,12 @@ def _record_task_map_for_downstreams(self, task: "Operator", value: Any, *, sess
# currently possible for a downstream to depend on one individual mapped
# task instance, only a task as a whole. This will change in AIP-42
# Phase 2, and we'll need to further analyze the mapped task case.
if task.is_mapped or next(task.iter_mapped_dependants(), None) is None:
if next(task.iter_mapped_dependants(), None) is None:
return
if value is None:
raise XComForMappingNotPushed()
if task.is_mapped:
return
if not isinstance(value, collections.abc.Collection) or isinstance(value, (bytes, str)):
raise UnmappableXComTypePushed(value)
task_map = TaskMap.from_task_instance_xcom(self, value)
@@ -2831,3 +2831,21 @@ def add_one(x):

query = XCom.get_many(run_id=dagrun.run_id, task_ids=["add_one__1"], session=session)
assert [x.value for x in query.order_by(None).order_by(XCom.map_index)] == [3, 4, 5]


def test_ti_mapped_depends_on_mapped_xcom_arg_XXX(dag_maker, session):
with dag_maker(session=session) as dag:

@dag.task
def add_one(x):
x + 1

two_three_four = add_one.expand(x=[1, 2, 3])
add_one.expand(x=two_three_four)

dagrun = dag_maker.create_dagrun()
for map_index in range(3):
ti = dagrun.get_task_instance("add_one", map_index=map_index)
ti.refresh_from_task(dag.get_task("add_one"))
with pytest.raises(XComForMappingNotPushed):
ti.run()

0 comments on commit 7813f99

Please sign in to comment.