Skip to content

Commit

Permalink
Fix: TaskInstance does not show queued_by_job_id & ``external…
Browse files Browse the repository at this point in the history
…_executor_id`` (#17179)

**Problem discovery:**
I was debugging a bug with the `external_executor_id` Airflow after which this UI bug caught my eye and I got annoyed by it. I figured to fix this one first so my other testing can go a bit smoother :)

**Description of the problem:**
Currently there is a BUG inside the Task Instance details (/task) view.
It loads the TaskInstance by calling `TI(task, execution_date)` and then uses `refresh_from_db()` to refresh many fields that are no filled in yet.
However, the assumption is made in that case that it refreshes all values, which it does not.
`external_executor_id` and `queued_by_job_id` are not updated at all and `executor_config` is only instantiated by the original `TI(task, execution_date)` call but also not updated in `refresh_from_db()`.
This also shows in the UI where these values are always showing None, while the TaskInstance view shows you these values are not None.

**The changes in the PR:**
1. Changes to the `update_from_db()` method to include the missing three values.
2. A new test that checks we are really updating ALL values in `update_from_db()`
3. Removal of an incorrect comment as we do need the `execution_date` for that view.

(cherry picked from commit 759c76d)
  • Loading branch information
Jorricks authored and jhtimmins committed Aug 13, 2021
1 parent 18c6baf commit 0656f10
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
3 changes: 3 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,10 @@ def refresh_from_db(self, session=None, lock_for_update=False) -> None:
self.priority_weight = ti.priority_weight
self.operator = ti.operator
self.queued_dttm = ti.queued_dttm
self.queued_by_job_id = ti.queued_by_job_id
self.pid = ti.pid
self.executor_config = ti.executor_config
self.external_executor_id = ti.external_executor_id
else:
self.state = None

Expand Down
2 changes: 0 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,8 +1231,6 @@ def task(self):
"""Retrieve task."""
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
# Carrying execution_date through, even though it's irrelevant for
# this context
execution_date = request.args.get('execution_date')
dttm = timezone.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
Expand Down
54 changes: 54 additions & 0 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2026,6 +2026,60 @@ def test_set_state_up_for_retry(self):
assert ti.start_date < ti.end_date
assert ti.duration > 0

def test_refresh_from_db(self):
run_date = timezone.utcnow()

expected_values = {
"task_id": "test_refresh_from_db_task",
"dag_id": "test_refresh_from_db_dag",
"execution_date": run_date,
"start_date": run_date + datetime.timedelta(days=1),
"end_date": run_date + datetime.timedelta(days=1, seconds=1, milliseconds=234),
"duration": 1.234,
"state": State.SUCCESS,
"_try_number": 1,
"max_tries": 1,
"hostname": "some_unique_hostname",
"unixname": "some_unique_unixname",
"job_id": 1234,
"pool": "some_fake_pool_id",
"pool_slots": 25,
"queue": "some_queue_id",
"priority_weight": 123,
"operator": "some_custom_operator",
"queued_dttm": run_date + datetime.timedelta(hours=1),
"queued_by_job_id": 321,
"pid": 123,
"executor_config": {"Some": {"extra": "information"}},
"external_executor_id": "some_executor_id",
}
# Make sure we aren't missing any new value in our expected_values list.
expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values.keys()}
assert {str(c) for c in TI.__table__.columns} == expected_keys, (
"Please add all non-foreign values of TaskInstance to this list. "
"This prevents refresh_from_db() from missing a field."
)

operator = DummyOperator(task_id=expected_values['task_id'])
ti = TI(task=operator, execution_date=expected_values['execution_date'])
for key, expected_value in expected_values.items():
setattr(ti, key, expected_value)
with create_session() as session:
session.merge(ti)
session.commit()

mock_task = mock.MagicMock()
mock_task.task_id = expected_values["task_id"]
mock_task.dag_id = expected_values["dag_id"]

ti = TI(task=mock_task, execution_date=run_date)
ti.refresh_from_db()
for key, expected_value in expected_values.items():
assert hasattr(ti, key), f"Key {key} is missing in the TaskInstance."
assert (
getattr(ti, key) == expected_value
), f"Key: {key} had different values. Make sure it loads it in the refresh refresh_from_db()"


@pytest.mark.parametrize("pool_override", [None, "test_pool2"])
def test_refresh_from_task(pool_override):
Expand Down

0 comments on commit 0656f10

Please sign in to comment.