Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6869] Bulk fetch DAGRuns for _process_task_instances #7489

Merged
merged 1 commit into from
Feb 29, 2020

Conversation

mik-laj
Copy link
Member

@mik-laj mik-laj commented Feb 21, 2020

Another performance optimization.
When a DAG file contains 199 DAGs, and each DAG contains 5 tasks

from datetime import timedelta

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'airflow',
    'start_date': days_ago(3),
}


def create_dag(dag_number):
    dag = DAG(
        dag_id=f'perf_50_dag_dummy_tasks_{dag_number}_of_50', default_args=args,
        dagrun_timeout=timedelta(minutes=60),
        is_paused_upon_creation=False,
    )

    for j in range(1, 10):
        DummyOperator(
            task_id='task_{}_of_5'.format(j),
            dag=dag
        )

    return dag


for i in range(1, 200):
    globals()[f"dag_{i}"] = create_dag(i)


I ran the following code:

from airflow.jobs.scheduler_job import DagFileProcessor

processor = DagFileProcessor([], log)
processor.process_file(DAG_FILE, None, pickle_dags=False)

I got the following values.
Before:
Query count: 1792
Average time: 4568.156 ms
After:
Query count: 1594
Average time: 3964.916 ms
Diff:
Query count: -200 (-11%)
Average time: -603 ms (-13%)

If I didn't make a mistake, when we combine the following changes:

we get only... 5 queries per file instead of ....1792

Thanks for support to @evgenyshulman from Databand!


Issue link: AIRFLOW-6869

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Feb 21, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6869][WIP] Bulk fetch DAGRun [AIRFLOW-6869][WIP] Bulk fetch DAGRuns Feb 21, 2020
@mik-laj mik-laj closed this Feb 21, 2020
@mik-laj mik-laj reopened this Feb 21, 2020
@mik-laj mik-laj force-pushed the bulk_dag_run branch 4 times, most recently from 8051030 to 139dd3d Compare February 21, 2020 22:46
@mik-laj mik-laj force-pushed the bulk_dag_run branch 2 times, most recently from cfc2e85 to f94e1fc Compare February 22, 2020 12:15
@mik-laj mik-laj changed the title [AIRFLOW-6869][WIP] Bulk fetch DAGRuns [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances Feb 22, 2020
@codecov-io
Copy link

Codecov Report

Merging #7489 into master will decrease coverage by 0.25%.
The diff coverage is 96.66%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #7489      +/-   ##
==========================================
- Coverage   86.79%   86.54%   -0.26%     
==========================================
  Files         887      891       +4     
  Lines       41976    42110     +134     
==========================================
+ Hits        36432    36442      +10     
- Misses       5544     5668     +124
Impacted Files Coverage Δ
airflow/jobs/scheduler_job.py 90.11% <100%> (-0.13%) ⬇️
airflow/models/dagrun.py 96.2% <83.33%> (-0.37%) ⬇️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 47.18% <0%> (-45.08%) ⬇️
...viders/cncf/kubernetes/operators/kubernetes_pod.py 69.38% <0%> (-25.52%) ⬇️
airflow/kubernetes/refresh_config.py 50.98% <0%> (-23.53%) ⬇️
airflow/utils/file.py 87.83% <0%> (-1.36%) ⬇️
airflow/utils/dag_processing.py 87.93% <0%> (-0.2%) ⬇️
airflow/models/baseoperator.py 96.52% <0%> (ø) ⬆️
... and 27 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4e0e2f0...f94e1fc. Read the comment docs.

@kaxil kaxil requested a review from ashb February 22, 2020 20:48
airflow/models/dagrun.py Outdated Show resolved Hide resolved
Comment on lines 698 to 717
# list() is needed because of a bug in Python 3.7+
#
# The following code returns different values depending on the Python version
# from itertools import groupby
# from unittest.mock import MagicMock
# key = "key"
# item = MagicMock(attr=key)
# items = [item]
# items_by_attr = {k: v for k, v in groupby(items, lambda d: d.attr)}
# print("items_by_attr=", items_by_attr)
# item_with_key = list(items_by_attr[key]) if key in items_by_attr else []
# print("item_with_key=", item_with_key)
#
# Python 3.7+:
# items_by_attr= {'key': <itertools._grouper object at 0x7f3b9f38d4d0>}
# item_with_key= []
#
# Python 3.6:
# items_by_attr= {'key': <itertools._grouper object at 0x101128630>}
# item_with_key= [<MagicMock id='4310405416'>]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behaviour is different on py3.6 and 3.7, but is still wrong on both when a more than a single item is in items: 3.6 would return the last item only.

The docs for groupby say:

Because the source is shared, when the groupby() object is advanced, the previous group is no longer visible. So, if that data is needed later, it should be stored as a list.

Since the behaviour without list is broken in otherways on 3.6 too I think we can just replace this comment with:

         # As per the docs of groupby (https://docs.python.org/3/library/itertools.html#itertools.groupby)
         # we need to use `list()` otherwise the result will be wrong/incomplete

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the comment change recommended by @ashb. Based the official doc, behavior from both 3.6 and 3.7 are expected and within the API spec. Returned group items are transient with each iteration and should be manually persisted if needed.

Copy link
Member Author

@mik-laj mik-laj Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the description during the next rebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self.log.info("Processing %s", dag.dag_id)
dag_id = dag.dag_id
self.log.info("Processing %s", dag_id)
dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d.get(dag_id, []) instead pls

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, small suggestions :)

airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Show resolved Hide resolved
airflow/jobs/scheduler_job.py Show resolved Hide resolved
airflow/models/dagrun.py Outdated Show resolved Hide resolved
@mik-laj mik-laj changed the title [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances [AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRuns for _process_task_instances Feb 26, 2020
@mik-laj mik-laj force-pushed the bulk_dag_run branch 7 times, most recently from 64d55d2 to 14b82b2 Compare February 28, 2020 13:18
@mik-laj mik-laj changed the title [AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRuns for _process_task_instances [AIRFLOW-6869] Bulk fetch DAGRuns for _process_task_instances Feb 28, 2020
@mik-laj mik-laj merged commit 063a35f into apache:master Feb 29, 2020
galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this pull request Mar 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:performance area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants