-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-1349] Refactor BackfillJob _execute #2463
Conversation
@edgarRd, thanks for your PR! By analyzing the history of the files in this pull request, we identified @bolkedebruin, @mistercrunch and @jlowin to be potential reviewers. |
Codecov Report
@@ Coverage Diff @@
## master #2463 +/- ##
==========================================
+ Coverage 69.43% 69.53% +0.09%
==========================================
Files 146 146
Lines 11351 11391 +40
==========================================
+ Hits 7882 7921 +39
- Misses 3469 3470 +1
Continue to review full report at Codecov.
|
@@ -1790,6 +1790,54 @@ class BackfillJob(BaseJob): | |||
'polymorphic_identity': 'BackfillJob' | |||
} | |||
|
|||
class _DagRunTaskStatus(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are putting this into an class, would it be something to add a integrity check? Ie there is a total number of tasks. The sum of the counters should always be equal to number. And yes we have seen leakage in the past.
Might be outside of this PR though (no logic change).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed on the consistency check, but also agree on the scope creep part. Edgar do you mind adding a TODO and attach it to a JIRA for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b70f175
to
4a1d500
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general mostly nits.
airflow/jobs.py
Outdated
it easier to pass it around. | ||
""" | ||
|
||
# TODO(edgarRd): https://issues.apache.org/jira/browse/AIRFLOW-1444 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: let's kill the whitespace/newlines, make it more descriptive (e.g. summary of the JIRA), and no need to include the full JIRA URL, just AIRFLOW-144 is OK (consistent with the other JIRA TODOs in the project)
airflow/jobs.py
Outdated
total_runs=0, | ||
): | ||
""" | ||
:param started: Dict of tasks already started |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you can remove "Dict of" since it's redundantly given by the type below. Better just explain what the keys/values represent in english.
Same for the other params.
airflow/jobs.py
Outdated
|
||
executor = self.executor | ||
executor.start() | ||
# explicitly mark as backfill and running as we can fill gaps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "as we can fill gaps" mean? I see it's copied from the old logic, let's kill the second part of this sentence if you don't know what it means either.
airflow/jobs.py
Outdated
@provide_session | ||
def _task_instances_for_dag_run(self, dag_run, session=None): | ||
""" | ||
Returns a map with the tasks to run for the given dag run. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Good to add what the map is mapping from (type of keys)
airflow/jobs.py
Outdated
# Triggering what is ready to get triggered | ||
while (len(tasks_to_run) > 0 or len(started) > 0) and not deadlocked: | ||
while ((len(tasks_to_run) > 0 or len(ti_status.started) > 0) and | ||
not ti_status.deadlocked): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make the ti_status.deadlocked consistent with the other checks in this while loop while we are refactoring it (> 0)
airflow/jobs.py
Outdated
|
||
return err | ||
|
||
def _get_run_dates(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this is generic enough that it better belongs as a public method under the DAG class in models.py, and it can take bf_start_date/end_date as arguments
airflow/jobs.py
Outdated
return run_dates | ||
|
||
@provide_session | ||
def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to make it more clear what the difference between this method and _execute is (either better method names or comments).
4a1d500
to
b6e2e75
Compare
airflow/jobs.py
Outdated
@@ -1841,40 +1889,39 @@ def __init__( | |||
self.pool = pool | |||
super(BackfillJob, self).__init__(*args, **kwargs) | |||
|
|||
def _update_counters(self, started, succeeded, skipped, failed, tasks_to_run): | |||
def _update_counters(self, ti_status, tasks_to_run): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this not be moved to _DagRunTaskStatus
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense. I've done the change.
b6e2e75
to
124568c
Compare
Runs a dag for a specified date range. | ||
Returns a dag run for the given run date, which will be matched to an existing | ||
dag run if available or create a new dag run otherwise. | ||
:param run_date: the execution date for the dag run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add :type:
annotations too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
airflow/jobs.py
Outdated
execution_date=run_date, | ||
session=session) | ||
|
||
if not run: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is probably better to check against None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
# return if already reached maximum active runs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a followup change is active runs doesn't count backfills? I'm actually not sure what the current logic is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of what AIRFLOW-1349 will address.
airflow/jobs.py
Outdated
run.state = State.RUNNING | ||
run.run_id = run_id | ||
run.verify_integrity(session=session) | ||
if not dag_run: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to check against None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
airflow/jobs.py
Outdated
for ti in dag_run.get_task_instances(): | ||
# all tasks part of the backfill are scheduled to run | ||
if ti.state == State.NONE: | ||
ti.set_state(State.SCHEDULED, session=session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty slow to run and expires the existing TIs. Better to query as a batch. Keep in mind session.commit expires all attributes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, as of this PR I just organized the code to have it easier to make those changes, so this is just a re-org of the code. I could create a ticket to improve the performance here and make a follow up change unless someone else thinks it should be included on this PR @aoen @bolkedebruin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Making a followup change is cool if this is just a refactor
airflow/jobs.py
Outdated
them in a backfill process. | ||
:param status: the internal status of the job | ||
:type status: BackfillJob._DagRunTaskStatus | ||
:param executor: the executor to run the task instances |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
types pls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
124568c
to
cb3b3a4
Compare
@aoen @bolkedebruin @saguziel is there any other comment / issue in order to move this PR forward? |
airflow/jobs.py
Outdated
:type not_ready: set | ||
:param deadlocked: Deadlocked tasks | ||
:type deadlocked: set | ||
:param active_runs: Active tasks at a certain point in time (mutate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does mutate mean? Let's clarify or kill
airflow/jobs.py
Outdated
def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id, | ||
start_date, session=None): | ||
""" | ||
This method does the bulk of the work, receiving all initialized components ready |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: bulk of the work doesn't explain what work.
BackfillJob._execute is doing multiple things - it is pretty hard to follow and maintain. Changes included are just a re-org of the code, no logic has been changed. Refactor includes: - Break BackfillJob._execute into functions - Add a Status object to track BackfillJob internal status while executing the job.
cb3b3a4
to
cf4dc3c
Compare
BackfillJob._execute is doing multiple things - it is pretty hard to follow and maintain. Changes included are just a re-org of the code, no logic has been changed. Refactor includes: - Break BackfillJob._execute into functions - Add a Status object to track BackfillJob internal status while executing the job. Closes apache#2463 from edgarRd/erod-backfill-refactor
Dear Airflow maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
BackfillJob._execute is doing multiple things - it is pretty hard to
follow and maintain.
Changes included are just a re-org of the code, no logic has been
changed.
Refactor includes:
executing the job.
Tests
BackfillJobTest.test_backfill_max_limit_check*
Commits