Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6887][DONT-MERGE] Do not check the state of fresh DAGRun #7510

Merged
merged 1 commit into from Feb 26, 2020

Conversation

@mik-laj
Copy link
Member

mik-laj commented Feb 23, 2020

When I have following DAG File:

from datetime import timedelta

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'airflow',
    'start_date': days_ago(3),
}

def create_dag(dag_number):
    dag = DAG(
        dag_id=f'perf_50_dag_dummy_tasks_{dag_number}_of_50', default_args=args,
        schedule_interval=timedelta(minutes=30),
        # schedule_interval=None,
        dagrun_timeout=timedelta(minutes=60),
        is_paused_upon_creation=False,
    )

    DummyOperator(
        task_id='task_1_of_5',
        dag=dag
    )

    return dag

for i in range(1, 200):
    globals()[f"dag_{i}"] = create_dag(i)

I applied other improvmnent from other PR and ran following code

from airflow.jobs.scheduler_job import DagFileProcessor

processor = DagFileProcessor([], log)
processor.process_file(DAG_FILE, None, pickle_dags=False)

I got the following values:
Before:
Query count: 1800
Average time 6104.482 ms
After:
Query count: 1601
Average time 5541.349 ms
Diff
Query count: 199 (-11%)
Average time 563 ms (-9%)


Issue link: AIRFLOW-6887

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@codecov-io

This comment has been minimized.

Copy link

codecov-io commented Feb 23, 2020

Codecov Report

Merging #7510 into master will decrease coverage by 0.29%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #7510     +/-   ##
=========================================
- Coverage   86.82%   86.53%   -0.3%     
=========================================
  Files         891      891             
  Lines       42095    42094      -1     
=========================================
- Hits        36549    36425    -124     
- Misses       5546     5669    +123
Impacted Files Coverage Δ
airflow/models/dag.py 91.28% <ø> (-0.02%) ⬇️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 47.18% <0%> (-45.08%) ⬇️
...viders/cncf/kubernetes/operators/kubernetes_pod.py 69.38% <0%> (-25.52%) ⬇️
airflow/kubernetes/refresh_config.py 50.98% <0%> (-23.53%) ⬇️
airflow/jobs/backfill_job.py 90.98% <0%> (-1.13%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 61a8bb6...aa07d56. Read the comment docs.

@kaxil
kaxil approved these changes Feb 23, 2020
@ashb

This comment has been minimized.

Copy link
Member

ashb commented Feb 24, 2020

What script are you using to get the numbers btw @mik-laj?

@mik-laj

This comment has been minimized.

Copy link
Member Author

mik-laj commented Feb 24, 2020

@ashb code that is part of....the Airflow codebase.
https://github.com/apache/airflow/blob/master/tests/test_utils/asserts.py#L37-L58
but I also have other scripts that help me. Here is my notepad:
https://gist.github.com/mik-laj/2f41e3e66b4331b8d79d90912e4e7c81

@potiuk

This comment has been minimized.

Copy link
Member

potiuk commented Feb 24, 2020

@ashb code that is part of....the Airflow codebase.
https://github.com/apache/airflow/blob/master/tests/test_utils/asserts.py#L37-L58
but I also have other scripts that help me. Here is my notepad:
https://gist.github.com/mik-laj/2f41e3e66b4331b8d79d90912e4e7c81

Yeah. It would be great to have those contributed with short instructions how to use them to get those numbers maybe ?

@ashb

This comment has been minimized.

Copy link
Member

ashb commented Feb 24, 2020

Ah yeah, I have a similar notepad.

And which helper fn for instance in this case?

@mik-laj

This comment has been minimized.

Copy link
Member Author

mik-laj commented Feb 24, 2020

You're right. I plan to write a blogspot to describe exactly the measurement methodology.

If you want to get the same numbers as in my script then you need to run it using the Python interpreter.

python script-name.py

After then you will message similar to the following:

['', '/opt/airflow', '/usr/local/lib/python37.zip', '/usr/local/lib/python3.7', '/usr/local/lib/python3.7/lib-dynload', '/usr/local/lib/python3.7/site-packages', '/root/airflow/dags', '/root/airflow/config', '/root/airflow/plugins']
[2020-02-24 16:06:14,377] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:15,281] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 1157.678 ms
[2020-02-24 16:06:15,537] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:15,825] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 473.342 ms
[2020-02-24 16:06:16,004] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:16,296] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 570.475 ms
[2020-02-24 16:06:16,579] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:16,898] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 543.631 ms
[2020-02-24 16:06:17,119] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:17,404] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 779.900 ms
[2020-02-24 16:06:17,907] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:18,279] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 572.046 ms
[2020-02-24 16:06:18,473] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:18,764] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 465.645 ms
[2020-02-24 16:06:18,938] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:19,360] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 606.974 ms
[2020-02-24 16:06:19,544] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:19,886] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 543.896 ms
[2020-02-24 16:06:20,088] {dagbag.py:376} INFO - Filling up the DagBag from /files/dags/50_dag_5_dummy_tasks.py
[2020-02-24 16:06:20,468] {dag.py:1478} INFO - Sync 299 DAGs
Query count:  7
Retry took 573.048 ms
Retry took 628.801 ms

The numbers seem self-explanatory except for the last value. This is the average of all try. If the script does not work then you may not have the DAG file. You should create it in the following file: /files/dags/50_dag_5_dummy_tasks.py

@mik-laj

This comment has been minimized.

Copy link
Member Author

mik-laj commented Feb 24, 2020

These part of code is most important:

    @timing(RETRY)
    @retry(RETRY)
    @timing()
    @count_queries
    def slow_case():
        from airflow.models import DagRun
        session.query(DagRun).delete()
        from airflow.models import TaskInstance
        session.query(TaskInstance).delete()
        session.commit()
        processor.process_file(DAG_FILE, None, pickle_dags=False)

    slow_case()

and I ran this code in 90% of cases.

@count_queries - count queries in block and displays on the screen
@timing() - display executon time
@retry(10) - runs functions in loop 10 times
@timing(10) - display executon time but divides the result by 10.

@houqp
houqp approved these changes Feb 24, 2020
Copy link
Contributor

Fokko left a comment

This is a tricky one. Imagine we have a DAG that potentially changes in shape every time we run it. For example, the DAG fetches the job from an external database or a filesystem, and based on this result we build the dag. Then before execution, we should fetch the latest version of the DAG to make sure that we're running the latest version.

@ashb

This comment has been minimized.

Copy link
Member

ashb commented Feb 25, 2020

@Fokko run.refresh_from_db() just loads the id and state columns. The case you are talking about is still handled correctly (by the sligtly badly named verify_integrity function.

@mik-laj mik-laj changed the title [AIRFLOW-6887][WIP] Do not check the state of fresh DAGRun [AIRFLOW-6887][DONT-MERGE] Do not check the state of fresh DAGRun Feb 26, 2020
@Fokko
Fokko approved these changes Feb 26, 2020
Copy link
Contributor

Fokko left a comment

Thanks @mik-laj for the PR, and @ashb for the explanation 👍

@Fokko Fokko merged commit c39ebaa into apache:master Feb 26, 2020
4 checks passed
4 checks passed
Mergeable Mergeable Run has been Completed!
Details
Title Validator Validation successful!
WIP Ready for review
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@ashb

This comment has been minimized.

Copy link
Member

ashb commented Feb 26, 2020

@Fokko The title said "DONT-MERGE" :)

@mik-laj Do we need to revert this one? Perhaps you should be creating Draft PRs instead https://github.blog/2019-02-14-introducing-draft-pull-requests/ (annoyingly you can't make a PR draft if you open it in the wrong mode, you have to open it as a draft one.)

@Fokko

This comment has been minimized.

Copy link
Contributor

Fokko commented Feb 26, 2020

Ugh, didn't have my coffee yet. I can revert the commit if you like. @mik-laj please advise

@ashb

This comment has been minimized.

Copy link
Member

ashb commented Feb 26, 2020

You picked a good PR to mess up on. This one is okay.

@mik-laj

This comment has been minimized.

Copy link
Member Author

mik-laj commented Feb 26, 2020

@ashb I added new rule to Github bot. #7543
Draft PR does not work on GH. If you create a normal PR then you can't convert it to a draft.

@mik-laj

This comment has been minimized.

Copy link
Member Author

mik-laj commented Feb 26, 2020

@Fokko This PR is okay. I wanted more people to have a chance to review because it's is important PR, so I added "DONT-MERGE" in the title.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

8 participants
You can’t perform that action at this time.