Skip to content

Commit

Permalink
Correctly restore upstream_task_ids when deserializing Operators (#8775)
Browse files Browse the repository at this point in the history
This test exposed a bug in one of the example dags, that wasn't caught
by #6549. That will be a fixed in a separate issue, but it caused the
round-trip tests to fail here

Fixes #8720

(cherry picked from commit 280f1f0)
  • Loading branch information
ashb authored and kaxil committed Jun 26, 2020
1 parent 340e43d commit 45bf7f4
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def deserialize_dag(cls, encoded_dag):
for task_id in serializable_task.downstream_task_ids:
# Bypass set_upstream etc here - it does more than we want
# noinspection PyProtectedMember
dag.task_dict[task_id]._upstream_task_ids.add(task_id) # pylint: disable=protected-access
dag.task_dict[task_id]._upstream_task_ids.add(serializable_task.task_id) # noqa: E501 # pylint: disable=protected-access

return dag

Expand Down
3 changes: 3 additions & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ def validate_deserialized_task(self, serialized_task, task,):
assert serialized_task.task_type == task.task_type
assert set(serialized_task.template_fields) == set(task.template_fields)

assert serialized_task.upstream_task_ids == task.upstream_task_ids
assert serialized_task.downstream_task_ids == task.downstream_task_ids

for field in fields_to_check:
assert getattr(serialized_task, field) == getattr(task, field), \
'{}.{}.{} does not match'.format(task.dag.dag_id, task.task_id, field)
Expand Down

0 comments on commit 45bf7f4

Please sign in to comment.