Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6881] Bulk fetch DAGRun for create_dag_run #7502

Merged
merged 1 commit into from Feb 29, 2020

Conversation

@mik-laj
Copy link
Member

mik-laj commented Feb 22, 2020

Another performance optimization.
When I have following DAG file

from datetime import timedelta

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'airflow',
    'start_date': days_ago(3),
}

def create_dag(dag_number):
    dag = DAG(
        dag_id=f'perf_50_dag_dummy_tasks_{dag_number}_of_50', default_args=args,
        schedule_interval="@once",
        dagrun_timeout=timedelta(minutes=60),
        is_paused_upon_creation=False,
    )

    for j in range(1, 10):
        DummyOperator(
            task_id='task_{}_of_5'.format(j),
            dag=dag
        )

    return dag

for i in range(1, 200):
    globals()[f"dag_{i}"] = create_dag(i)

I ran the following code:

from airflow.jobs.scheduler_job import DagFileProcessor

processor = DagFileProcessor([], log)
processor.process_file(DAG_FILE, None, pickle_dags=False)

I got the following values.
Before:
Query count: 2589
Average time 7980.117 ms
After:
Query count: 2390
Average time: 7261.959 ms
Diff:
Query count: -199 (-7%)
Average time: -719 ms (-9%)

Thanks for support to @evgenyshulman from Databand!


Issue link: AIRFLOW-6881

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@codecov-io

This comment has been minimized.

Copy link

codecov-io commented Feb 22, 2020

Codecov Report

Merging #7502 into master will decrease coverage by 0.42%.
The diff coverage is 97.14%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #7502      +/-   ##
==========================================
- Coverage   86.79%   86.36%   -0.43%     
==========================================
  Files         887      891       +4     
  Lines       41976    42112     +136     
==========================================
- Hits        36432    36372      -60     
- Misses       5544     5740     +196
Impacted Files Coverage Δ
airflow/jobs/scheduler_job.py 90% <100%> (-0.25%) ⬇️
airflow/models/dagrun.py 96.2% <83.33%> (-0.37%) ⬇️
...w/providers/apache/hive/operators/mysql_to_hive.py 35.84% <0%> (-64.16%) ⬇️
airflow/operators/generic_transfer.py 100% <0%> (ø) ⬆️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/providers/postgres/operators/postgres.py 100% <0%> (ø) ⬆️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/security/kerberos.py 30.43% <0%> (-45.66%) ⬇️
airflow/kubernetes/pod_launcher.py 47.18% <0%> (-45.08%) ⬇️
airflow/providers/mysql/operators/mysql.py 55% <0%> (-45%) ⬇️
... and 39 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4e0e2f0...59bb24d. Read the comment docs.

@tooptoop4

This comment has been minimized.

Copy link
Contributor

tooptoop4 commented Feb 22, 2020

can u get perf timings before/after for dags with 500 tasks in them? with these types of changes there could be regression with different dag shapes

@mik-laj

This comment has been minimized.

Copy link
Member Author

mik-laj commented Feb 22, 2020

@tooptoop4 The number of tasks does not affect this, but I will also check this case and give values ..

@mik-laj

This comment has been minimized.

Copy link
Member Author

mik-laj commented Feb 22, 2020

from datetime import timedelta

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'airflow',
    'start_date': days_ago(3),
}

def create_dag(dag_number):
    dag = DAG(
        dag_id=f'perf_50_dag_dummy_tasks_{dag_number}_of_50', default_args=args,
        schedule_interval="@once",
        dagrun_timeout=timedelta(minutes=60),
        is_paused_upon_creation=False,
    )

    for j in range(1, 100):
        DummyOperator(
            task_id='task_{}_of_5'.format(j),
            dag=dag
        )

    return dag

for i in range(1, 20):
    globals()[f"dag_{i}"] = create_dag(i)

Before:
Query count: 249
Retry took 1333.755 ms

After:
Query count: 230
Retry took 1267.842 ms

@ashb
ashb approved these changes Feb 24, 2020
Copy link
Member

ashb left a comment

This one looks okay.

self.log.info("Processing %s", dag.dag_id)
dag_id = dag.dag_id
self.log.info("Processing %s", dag_id)
dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []

This comment has been minimized.

Copy link
@saguziel

saguziel Feb 24, 2020

Contributor

maybe just dag_runs_by_dag_id.get(dag_id, [])

This comment has been minimized.

Copy link
@mik-laj

mik-laj Feb 28, 2020

Author Member

Done

airflow/models/dagrun.py Outdated Show resolved Hide resolved
@mik-laj mik-laj changed the title [AIRFLOW-6881][depends on AIRFLOW-6869][WIP] Bulk fetch DAGRun for create_dag_run [AIRFLOW-6881][depends on AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRun for create_dag_run Feb 26, 2020
@mik-laj mik-laj force-pushed the PolideaInternal:bulk_dag_run-2 branch 2 times, most recently from 408fbba to 417f836 Feb 27, 2020
@mik-laj mik-laj force-pushed the PolideaInternal:bulk_dag_run-2 branch from 417f836 to 616ff65 Feb 29, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6881][depends on AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRun for create_dag_run [AIRFLOW-6881] Bulk fetch DAGRun for create_dag_run Feb 29, 2020
@mik-laj mik-laj merged commit cc562dd into apache:master Feb 29, 2020
2 of 3 checks passed
2 of 3 checks passed
continuous-integration/travis-ci/pr The Travis CI build is in progress
Details
Mergeable Mergeable Run has been Completed!
Details
WIP Ready for review
Details
galuszkak added a commit to FlyrInc/apache-airflow that referenced this pull request Mar 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

6 participants
You can’t perform that action at this time.