Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6856] Bulk fetch paused_dag_ids #7476

Merged

Conversation

mik-laj
Copy link
Member

@mik-laj mik-laj commented Feb 20, 2020

I created the following DAG file:

args = {
    'owner': 'airflow',
    'start_date': days_ago(3),
}


def create_dag(dag_number):
    dag = DAG(
        dag_id=f'perf_50_dag_dummy_tasks_{dag_number}_of_50', default_args=args,
        schedule_interval=None,
        dagrun_timeout=timedelta(minutes=60)
    )

    for j in range(1, 5):
        DummyOperator(
            task_id='task_{}_of_5'.format(j),
            dag=dag
        )

    return dag


for i in range(1, 50):
    globals()[f"dag_{i}"] = create_dag(i)

and I used the following code to test performance.

import functools
import logging

import time

from airflow.jobs.scheduler_job import DagFileProcessor


class CountQueries(object):
    def __init__(self):
        self.count = 0

    def __enter__(self):
        from sqlalchemy import event
        from airflow.settings import engine
        event.listen(engine, "after_cursor_execute", self.after_cursor_execute)
        return None

    def after_cursor_execute(self, *args, **kwargs):
        self.count += 1

    def __exit__(self, type, value, traceback):
        from sqlalchemy import event
        from airflow.settings import engine
        event.remove(engine, "after_cursor_execute", self.after_cursor_execute)
        print('Query count: ', self.count)


count_queries = CountQueries

DAG_FILE = "/files/dags/50_dag_5_dummy_tasks.py"

log = logging.getLogger("airflow.processor")
processor = DagFileProcessor([], log)

def timing(f):

    @functools.wraps(f)
    def wrap(*args):
        RETRY_COUNT = 5
        r = []
        for i in range(RETRY_COUNT):
            time1 = time.time()
            f(*args)
            time2 = time.time()
            diff = (time2 - time1) * 1000.0
            r.append(diff)
            # print('Retry %d took %0.3f ms' % (i, diff))
        print('Average took %0.3f ms' % (sum(r) / RETRY_COUNT))

    return wrap


@timing
def slow_case():
    with count_queries():
        processor.process_file(DAG_FILE, None, pickle_dags=False)

slow_case()

As a result, I obtained the following values
Before:
Query count: 442
Average time: 1182.187 ms

After:
Query count: 297
Average time: 769.421 ms

Diff:
Query count: -145 (-32%)
Average time: -413 ms (-34%)

When we have 200 DAGS with 5 tasks each in one file:
Before:
Query count: 1792
Average took 4505.891 ms
After:
Query count: 1197
Average took 3335.856 ms

Thanks for support to @evgenyshulman from Databand!


Issue link: AIRFLOW-6856

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added the area:scheduler/executor label Feb 20, 2020
@mik-laj mik-laj requested review from ashb, turbaszek and potiuk Feb 20, 2020
@mik-laj mik-laj force-pushed the AIRFLOW-6856-bulk-fetch-paused-dag-ids branch from 4c3c638 to 5b33dc5 Compare Feb 20, 2020
@codecov-io
Copy link

@codecov-io codecov-io commented Feb 20, 2020

Codecov Report

Merging #7476 into master will decrease coverage by 0.26%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #7476      +/-   ##
==========================================
- Coverage   86.81%   86.55%   -0.27%     
==========================================
  Files         893      893              
  Lines       42193    42342     +149     
==========================================
+ Hits        36629    36647      +18     
- Misses       5564     5695     +131
Impacted Files Coverage Δ
airflow/utils/dag_processing.py 88.63% <ø> (+0.7%) ⬆️
airflow/dag/base_dag.py 69.56% <ø> (+1.56%) ⬆️
airflow/jobs/scheduler_job.py 90.72% <100%> (+0.48%) ⬆️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 47.18% <0%> (-45.08%) ⬇️
...viders/cncf/kubernetes/operators/kubernetes_pod.py 69.69% <0%> (-25.26%) ⬇️
airflow/kubernetes/refresh_config.py 50.98% <0%> (-23.53%) ⬇️
airflow/providers/amazon/aws/hooks/sns.py 96.42% <0%> (-3.58%) ⬇️
airflow/models/dagrun.py 95.81% <0%> (-0.76%) ⬇️
... and 9 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0ec2774...72478de. Read the comment docs.

.filter(DagModel.is_paused.is_(True))
.filter(DagModel.dag_id.in_(dagbag.dag_ids))
.all()
)
Copy link
Member

@ashb ashb Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few lines after this we call self._process_dags, which the makes this same query again. Is it worth passing it in instead?

The other thing I'm wondering API wise is if this should be encapsulated inside DagBag (something like a dagbag.paused_dag_ids method or accessor) - but that may not play very well with the global/long-lived DagBag object the webserver has.

Copy link
Member Author

@mik-laj mik-laj Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. dags parameters contain only active dags, so we don't have to check it a second time.

@mik-laj mik-laj force-pushed the AIRFLOW-6856-bulk-fetch-paused-dag-ids branch from 5b33dc5 to ea3997b Compare Feb 21, 2020
@boring-cyborg boring-cyborg bot added the area:webserver label Feb 21, 2020
dag_ids = set(dag_by_ids.keys())
orm_dags = session.query(DagModel)\
.options(
joinedload(DagModel.tags, innerjoin=False)
Copy link
Member

@kaxil kaxil Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Member

@ashb ashb Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loads all the tags for all the dags we've loaded in one query, rather than needing one query for each dag. This is commonly called an n+1 query situation (which as Kamil has shown, are expensive/results in lots of extra queries)

Copy link
Member Author

@mik-laj mik-laj Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#7477 Here is PR about DAG Sync.

kaxil
kaxil approved these changes Feb 21, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6856] Bulk fetch paused_dag_ids [AIRFLOW-6856][depends on AIRFLOW-6857,AIRFLOW-6862] Bulk fetch paused_dag_ids Feb 21, 2020
@mik-laj mik-laj force-pushed the AIRFLOW-6856-bulk-fetch-paused-dag-ids branch from ea3997b to ef3622f Compare Feb 21, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6856][depends on AIRFLOW-6857,AIRFLOW-6862] Bulk fetch paused_dag_ids [AIRFLOW-6856][depends on AIRFLOW-6862] Bulk fetch paused_dag_ids Feb 21, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6856][depends on AIRFLOW-6862] Bulk fetch paused_dag_ids [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids Feb 21, 2020
@mik-laj mik-laj force-pushed the AIRFLOW-6856-bulk-fetch-paused-dag-ids branch from ef3622f to 72478de Compare Feb 24, 2020
houqp
houqp approved these changes Feb 24, 2020
Fokko
Fokko approved these changes Feb 25, 2020
Copy link
Contributor

@Fokko Fokko left a comment

LGTM

airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
@mik-laj mik-laj changed the title [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids [AIRFLOW-6856][depends on AIRFLOW-6862][DONT-MERGE] Bulk fetch paused_dag_ids Feb 26, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6856][depends on AIRFLOW-6862][DONT-MERGE] Bulk fetch paused_dag_ids [AIRFLOW-6856][DONT-MERGE] Bulk fetch paused_dag_ids Feb 27, 2020
@mik-laj mik-laj force-pushed the AIRFLOW-6856-bulk-fetch-paused-dag-ids branch 3 times, most recently from 59521c7 to b1937c0 Compare Feb 27, 2020
@mik-laj mik-laj force-pushed the AIRFLOW-6856-bulk-fetch-paused-dag-ids branch from b1937c0 to 86d7abc Compare Feb 27, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6856][DONT-MERGE] Bulk fetch paused_dag_ids [AIRFLOW-6856] Bulk fetch paused_dag_ids Feb 27, 2020
@mik-laj mik-laj merged commit d031f84 into apache:master Feb 27, 2020
4 checks passed
paused_dag_ids = (
session.query(DagModel.dag_id)
.filter(DagModel.is_paused.is_(True))
.filter(DagModel.dag_id.in_(dagbag.dag_ids))
Copy link
Contributor

@vardancse vardancse Mar 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is IN query recommended here for use cases where we have 10 thousands of paused dag? Shouldn't query be broken into smaller batches?

Copy link
Member Author

@mik-laj mik-laj Mar 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is executed only for DAGs from one file. In one file you would have to have several thousand DAGs to cause problems. For now, I focused on the situation when we have up to 200 DAGs. If we want to support several thousand DAGs in one file, we need to introduce much more optimization and this one would not change anything.

Copy link
Contributor

@vardancse vardancse Mar 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for trouble, I missed DAGs from one file part earlier I thought we are scanning across folder

galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this issue Mar 5, 2020
@potiuk potiuk added this to the Airflow 1.10.11 milestone Jun 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:performance area:scheduler/executor area:webserver
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants