-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change TaskInstance and TaskReschedule PK from execution_date to run_id #17719
Conversation
94e0cb2
to
1a1cd01
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if
Lines 1485 to 1486 in 4da4c18
ti = models.TaskInstance(task=task, execution_date=execution_date) | |
ti.refresh_from_db() |
Yes it should. It'll still work as it is but that relies on the compat shims |
1a1cd01
to
27c58db
Compare
I'm not quite expecting all tests to pass on this yet, but I'd like a review on this, so I'm marking it ready while I work on fixing up the remaining tests |
@@ -314,64 +330,12 @@ def test_task_get_template(self): | |||
assert value == expected_value | |||
assert [str(m.message) for m in recorder] == [message] | |||
|
|||
def test_local_task_job(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are duplicating what was already in test_local_task_job -- we don't need them again.
@@ -189,80 +189,6 @@ def is_alive(self, grace_multiplier: Optional[float] = None) -> bool: | |||
and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < scheduler_health_check_threshold | |||
) | |||
|
|||
@provide_session | |||
def _change_state_for_tis_without_dagrun( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is deleted, as the DB won't let us have TIs without Dagruns anymore :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huge, can clear the issue of tasks getting stuck!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since tasks will no longer exist without dagrun, obviously the dag will fail or succeed and tis won't be stuck. I think we should remove these lines.:
airflow/airflow/jobs/scheduler_job.py
Lines 603 to 604 in 5d90bcb
self.log.info('Setting task instance %s state to %s as reported by executor', ti, state) | |
ti.set_state(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can happen if (for example) there is a crash in the worker early on in the "boot" process, so it can't report it's own status in the DB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only read about 1/5 (probably less) of the patch. Generally seems fine to me but some thoughts on the implementation.
178ce9e
to
8dd8622
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nervously
Since TaskReschedule had an existing FK to TaskInstance we had to move
change both of these at the same time.
This puts an explicit FK constraint between TaskInstance and DagRun,
meaning that we can remove a lot of "find TIs without DagRun" code in
the scheduler too, as that is no longer a possible situation.
Since there is now an explicit foreign key between TaskInstance and
DagRun, we can remove a lot of the "cleanup" code in the scheduler that
was dealing with this.
This PR is, sadly, unavoidably large as it changes a fundamental part of the Airflow system, the primary key of the TaskInstance table :(
Closes #17046
Related #16302, #17030
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.