Cleanup & improvements around scheduling#12815
Conversation
- Remove unneeded code line - Remove stale docstring - Fix wrong docstring - Fix stale doc image link in docstring - avoid unnecessary loop in DagRun.schedule_tis() - Minor improvement on DAG.deactivate_stale_dags() which is invoked inside SchedulerJob
| schedulable_ti_ids.append(ti.task_id) | ||
|
|
||
| schedulable_ti_ids = [ti.task_id for ti in schedulable_tis if ti not in dummy_tis] | ||
| count = 0 |
There was a problem hiding this comment.
This change is to ensure we traverse schedulable_tis only once, rather than twice.
There was a problem hiding this comment.
Do you think it is worth moving these logics to a separate function? See: airflow.utils.helpers.partition
There was a problem hiding this comment.
Personally I don't find it necessary (for now).
On the other hand, if we abstract this into a separate function, at least to what I can see, it's sort of "duplicated" with helpers.partition() (I don't want to use helpers.partition() here because it still traverse the iterable twice.)
airflow/models/dag.py
Outdated
| dag.is_active = False | ||
| session.merge(dag) | ||
| session.commit() | ||
| session.commit() |
There was a problem hiding this comment.
Similar to deactivate_unknown_dags(), commit only once outside the for-loop.
There was a problem hiding this comment.
This should actually just be
| session.commit() | |
| session.flush() |
to be in line with https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#database-session-handling
There was a problem hiding this comment.
In this case, only in model/dag.py there are also a few other usage of session.commit() when @provide_session is used. Address them in this PR as well?
There was a problem hiding this comment.
Possibly -- do you think they make sense all as one PR?
There was a problem hiding this comment.
How about this: I will skip it in this PR, and have another PR dedicated for clearing session.commit() project-wise. So the PR scopes are clearer. Agree?
There was a problem hiding this comment.
I created issue #12818 for clearing session.commit().
Given it's a relatively easy fix to do, I mark it as "good first issue" and let's see if any new-contributor would like to pick it up (will voice in Slack).
|
|
||
| Returns a list of serialized_dag dicts that represent the DAGs found in | ||
| the file | ||
|
|
There was a problem hiding this comment.
DagFileProcessor.process_file()doesn't take care of killing zombie anymore.- The statement of what's returned here is stale
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
…ss_file() is not needed anymore
1. Cleanup
Mainly to clean up stable docstring.
2. Improvements
DagRun.schedule_tis(), which is invoked insideSchedulerJobMinor improvement on(to do in separate PR to fix similar issue project-wise, for clearer PR scopes)DAG.deactivate_stale_dags(), which is invoked insideSchedulerJob^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.