Skip to content

Commit

Permalink
Fix ExternalTaskSensor when there is not task group TIs for the curre…
Browse files Browse the repository at this point in the history
…nt execution date (apache#32009)

* Add a check on none TIs for the current execution date

Signed-off-by: Hussein Awala <hussein@awala.fr>

* replace inline if-else by old one

Signed-off-by: Hussein Awala <hussein@awala.fr>

---------

Signed-off-by: Hussein Awala <hussein@awala.fr>
  • Loading branch information
hussein-awala authored and ferruzzi committed Jun 27, 2023
1 parent 7796a1f commit 6ec17c7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
13 changes: 8 additions & 5 deletions airflow/sensors/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,14 @@ def get_count(self, dttm_filter, session, states) -> int:
) / len(self.external_task_ids)
elif self.external_task_group_id:
external_task_group_task_ids = self.get_external_task_group_task_ids(session, dttm_filter)
count = (
self._count_query(TI, session, states, dttm_filter)
.filter(tuple_in_condition((TI.task_id, TI.map_index), external_task_group_task_ids))
.scalar()
) / len(external_task_group_task_ids)
if not external_task_group_task_ids:
count = 0
else:
count = (
self._count_query(TI, session, states, dttm_filter)
.filter(tuple_in_condition((TI.task_id, TI.map_index), external_task_group_task_ids))
.scalar()
) / len(external_task_group_task_ids)
else:
count = self._count_query(DR, session, states, dttm_filter).scalar()
return count
Expand Down
20 changes: 20 additions & 0 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,26 @@ def test_external_task_group_with_mapped_tasks_failed_states(self):
):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def test_external_task_group_when_there_is_no_TIs(self):
"""Test that the sensor does not fail when there are no TIs to check."""
self.add_time_sensor()
self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
external_task_group_id=TEST_TASK_GROUP_ID,
failed_states=[State.FAILED],
dag=self.dag,
poke_interval=1,
timeout=3,
)
with pytest.raises(AirflowSensorTimeout):
op.run(
start_date=DEFAULT_DATE + timedelta(hours=1),
end_date=DEFAULT_DATE + timedelta(hours=1),
ignore_ti_state=True,
)


def test_external_task_sensor_check_zipped_dag_existence(dag_zip_maker):
with dag_zip_maker("test_external_task_sensor_check_existense.py") as dagbag:
Expand Down

0 comments on commit 6ec17c7

Please sign in to comment.