Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6857] Bulk sync DAGs #7477

Merged
merged 2 commits into from Feb 27, 2020

Conversation

mik-laj
Copy link
Member

@mik-laj mik-laj commented Feb 21, 2020

I created the following DAG file:

args = {
    'owner': 'airflow',
    'start_date': days_ago(3),
}


def create_dag(dag_number):
    dag = DAG(
        dag_id=f'perf_50_dag_dummy_tasks_{dag_number}_of_50', default_args=args,
        schedule_interval=None,
        dagrun_timeout=timedelta(minutes=60)
    )

    for j in range(1, 5):
        DummyOperator(
            task_id='task_{}_of_5'.format(j),
            dag=dag
        )

    return dag


for i in range(1, 200):
    globals()[f"dag_{i}"] = create_dag(i)

and I used the following code to test performance.

import functools
import logging

import time

from airflow.jobs.scheduler_job import DagFileProcessor


class CountQueries(object):
    def __init__(self):
        self.count = 0

    def __enter__(self):
        from sqlalchemy import event
        from airflow.settings import engine
        event.listen(engine, "after_cursor_execute", self.after_cursor_execute)
        return None

    def after_cursor_execute(self, *args, **kwargs):
        self.count += 1

    def __exit__(self, type, value, traceback):
        from sqlalchemy import event
        from airflow.settings import engine
        event.remove(engine, "after_cursor_execute", self.after_cursor_execute)
        print('Query count: ', self.count)


count_queries = CountQueries

DAG_FILE = "/files/dags/200_dag_5_dummy_tasks.py"

log = logging.getLogger("airflow.processor")
processor = DagFileProcessor([], log)

def timing(f):

    @functools.wraps(f)
    def wrap(*args):
        RETRY_COUNT = 5
        r = []
        for i in range(RETRY_COUNT):
            time1 = time.time()
            f(*args)
            time2 = time.time()
            diff = (time2 - time1) * 1000.0
            r.append(diff)
            # print('Retry %d took %0.3f ms' % (i, diff))
        print('Average took %0.3f ms' % (sum(r) / RETRY_COUNT))

    return wrap


@timing
def slow_case():
    with count_queries():
        processor.process_file(DAG_FILE, None, pickle_dags=False)

slow_case()

I also cherry-picked AIRFLOW-6856

As a result, I obtained the following values

Master:
Query count: 1792
Average took 4505.891 ms

AIRFLOW-6856:
Query count: 1197
Average took 3203.710 ms

Current:
Query count: 602
Average time: 2018.891 ms

Diff to AIRFLOW-6856
Query count: -592 (-49%)
Average time: -1185ms (-36%)

Diff to master
Query count: -1190 (-66%)
Average time: -2484 ms (-55%)

Thanks for support to @evgenyshulman from Databand!


Issue link: AIRFLOW-6857

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added area:scheduler/executor area:webserver labels Feb 21, 2020
@mik-laj mik-laj force-pushed the AIRFLOW-6857-bulk-sync-dags branch from b7c1f2a to 12d6ebe Compare Feb 21, 2020
@codecov-io
Copy link

@codecov-io codecov-io commented Feb 21, 2020

Codecov Report

Merging #7477 into master will decrease coverage by 0.26%.
The diff coverage is 97.87%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #7477      +/-   ##
==========================================
- Coverage   86.81%   86.54%   -0.27%     
==========================================
  Files         896      896              
  Lines       42656    42671      +15     
==========================================
- Hits        37033    36931     -102     
- Misses       5623     5740     +117
Impacted Files Coverage Δ
airflow/models/dagbag.py 89.74% <100%> (+0.13%) ⬆️
airflow/jobs/scheduler_job.py 90.05% <100%> (-0.02%) ⬇️
airflow/utils/db.py 98.3% <100%> (-0.02%) ⬇️
airflow/models/dag.py 91.51% <97.61%> (+0.15%) ⬆️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 47.18% <0%> (-45.08%) ⬇️
...viders/cncf/kubernetes/operators/kubernetes_pod.py 69.69% <0%> (-25.26%) ⬇️
airflow/kubernetes/refresh_config.py 50.98% <0%> (-23.53%) ⬇️
airflow/utils/dag_processing.py 86.8% <0%> (+0.38%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 835d859...51bb359. Read the comment docs.

airflow/models/dag.py Outdated Show resolved Hide resolved
Copy link
Member

@ashb ashb left a comment

A nice change overall!

Couple of questions, and a possible different location for the bulk sync method.

airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/models/dag.py Show resolved Hide resolved
airflow/models/dag.py Outdated Show resolved Hide resolved
airflow/models/dag.py Outdated Show resolved Hide resolved
airflow/models/dag.py Show resolved Hide resolved
tests/models/test_dag.py Show resolved Hide resolved
@mik-laj mik-laj force-pushed the AIRFLOW-6857-bulk-sync-dags branch from 68e1c88 to 1f1fc86 Compare Feb 21, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6857][depends on AIRFLOW-6856] Bulk sync DAGs [AIRFLOW-6857] Bulk sync DAGs Feb 21, 2020
@mik-laj
Copy link
Member Author

@mik-laj mik-laj commented Feb 21, 2020

I have following error, so I have to delete wait_for_update.

E       sqlalchemy.exc.NotSupportedError: (psycopg2.errors.FeatureNotSupported) FOR UPDATE cannot be applied to the nullable side of an outer join
E       
E       [SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active AS dag_is_active, dag.last_scheduler_run AS dag_last_scheduler_run, dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS dag_schedule_interval, dag_tag_1.name AS dag_tag_1_name, dag_tag_1.dag_id AS dag_tag_1_dag_id 
E       FROM dag LEFT OUTER JOIN dag_tag AS dag_tag_1 ON dag.dag_id = dag_tag_1.dag_id 
E       WHERE dag.dag_id IN (%(dag_id_1)s, %(dag_id_2)s, %(dag_id_3)s, %(dag_id_4)s, %(dag_id_5)s, %(dag_id_6)s, %(dag_id_7)s, %(dag_id_8)s, %(dag_id_9)s, %(dag_id_10)s, %(dag_id_11)s, %(dag_id_12)s, %(dag_id_13)s, %(dag_id_14)s, %(dag_id_15)s, %(dag_id_16)s, %(dag_id_17)s, %(dag_id_18)s, %(dag_id_19)s, %(dag_id_20)s, %(dag_id_21)s, %(dag_id_22)s, %(dag_id_23)s, %(dag_id_24)s, %(dag_id_25)s, %(dag_id_26)s, %(dag_id_27)s, %(dag_id_28)s, %(dag_id_29)s, %(dag_id_30)s, %(dag_id_31)s, %(dag_id_32)s, %(dag_id_33)s, %(dag_id_34)s, %(dag_id_35)s, %(dag_id_36)s, %(dag_id_37)s, %(dag_id_38)s, %(dag_id_39)s, %(dag_id_40)s, %(dag_id_41)s, %(dag_id_42)s, %(dag_id_43)s, %(dag_id_44)s, %(dag_id_45)s, %(dag_id_46)s, %(dag_id_47)s, %(dag_id_48)s, %(dag_id_49)s, %(dag_id_50)s, %(dag_id_51)s, %(dag_id_52)s) FOR UPDATE]
E       [parameters: {'dag_id_1': 'test_on_kill', 'dag_id_2': 'example_external_task_marker_child', 'dag_id_3': 'test_utils', 'dag_id_4': 'example_xcom', 'dag_id_5': 'test_mark_success', 'dag_id_6': 'test_localtaskjob_double_trigger', 'dag_id_7': 'test_dagrun_states_deadlock', 'dag_id_8': 'example_branch_dop_operator_v3', 'dag_id_9': 'test_depends_on_past', 'dag_id_10': 'test_dagrun_states_fail', 'dag_id_11': 'impersonation_subdag.test_subdag_operation', 'dag_id_12': 'test_dagrun_states_root_fail', 'dag_id_13': 'clear_subdag_test_dag.daily_job', 'dag_id_14': 'test_dagrun_states_root_fail_unfinished', 'dag_id_15': 'example_passing_params_via_test_command', 'dag_id_16': 'latest_only_with_trigger', 'dag_id_17': 'example_subdag_operator.section-1', 'dag_id_18': 'test_start_date_scheduling', 'dag_id_19': 'test_backfill_pooled_task_dag', 'dag_id_20': 'latest_only', 'dag_id_21': 'test_dagrun_states_root_future', 'dag_id_22': 'test_task_start_date_scheduling', 'dag_id_23': 'example_kubernetes_executor', 'dag_id_24': 'test_latest_runs_1', 'dag_id_25': 'test_task_view_type_check', 'dag_id_26': 'example_trigger_target_dag', 'dag_id_27': 'impersonation_subdag', 'dag_id_28': 'test_default_impersonation', 'dag_id_29': 'test_dag_under_subdir2', 'dag_id_30': 'test_zip_dag', 'dag_id_31': 'test_retry_handling_job', 'dag_id_32': 'test_no_impersonation', 'dag_id_33': 'example_short_circuit_operator', 'dag_id_34': 'example_bash_operator', 'dag_id_35': 'test_run_ignores_all_dependencies', 'dag_id_36': 'test_subdag_operator', 'dag_id_37': 'example_subdag_operator.section-2', 'dag_id_38': 'example_kubernetes_executor_config', 'dag_id_39': 'example_branch_operator', 'dag_id_40': 'test_example_bash_operator', 'dag_id_41': 'example_trigger_controller_dag', 'dag_id_42': 'example_complex', 'dag_id_43': 'example_external_task_marker_parent', 'dag_id_44': 'test_subdag_operator.section-1', 'dag_id_45': 'tutorial', 'dag_id_46': 'example_subdag_operator', 'dag_id_47': 'test_heartbeat_failed_fast', 'dag_id_48': 'example_python_operator', 'dag_id_49': 'example_skip_dag', 'dag_id_50': 'test_impersonation', 'dag_id_51': 'clear_subdag_test_dag', 'dag_id_52': 'test_dagrun_states_success'}]

@mik-laj mik-laj force-pushed the AIRFLOW-6857-bulk-sync-dags branch from 1f1fc86 to d1829f2 Compare Feb 21, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6857] Bulk sync DAGs [AIRFLOW-6857][WIP] Bulk sync DAGs Feb 21, 2020
ashb
ashb approved these changes Feb 21, 2020
@mik-laj mik-laj force-pushed the AIRFLOW-6857-bulk-sync-dags branch from d1829f2 to 310ba6c Compare Feb 23, 2020
@ashb
Copy link
Member

@ashb ashb commented Feb 24, 2020

The FOR UPDATE is a fairly hard requirement. Without it scheduler HA is impossible to do correctly. :(

@ashb
Copy link
Member

@ashb ashb commented Feb 24, 2020

Ah, cos Postgres is trying to lock everything.

@mik-laj .with_for_update(of=DagModel) should be what we need.

houqp
houqp approved these changes Feb 24, 2020
orm_dag.default_view = dag._default_view
orm_dag.description = dag.description
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.tags = dag.get_dagtags(session=session)

session.commit()
Copy link
Contributor

@saguziel saguziel Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth doing this in smaller batches rather than one gigantic commit. I don't think atomicity is a concern but large transactions may be.

Copy link
Member Author

@mik-laj mik-laj Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only done from DAGs from one file. I suspect that even if someone puts 200+ DAG in one file, adding up to 200+ entry to DB in one transaction is not a problem.

Fokko
Fokko approved these changes Feb 25, 2020
Copy link
Contributor

@Fokko Fokko left a comment

LGTM

Copy link
Member

@feluelle feluelle left a comment

Nice one! 👍

@mik-laj mik-laj changed the title [AIRFLOW-6857][WIP] Bulk sync DAGs [AIRFLOW-6857][DONT-MERGE] Bulk sync DAGs Feb 26, 2020
@mik-laj mik-laj force-pushed the AIRFLOW-6857-bulk-sync-dags branch from 2737a53 to 51bb359 Compare Feb 27, 2020
@mik-laj mik-laj changed the title [AIRFLOW-6857][DONT-MERGE] Bulk sync DAGs [AIRFLOW-6857] Bulk sync DAGs Feb 27, 2020
@mik-laj mik-laj merged commit 031b4b7 into apache:master Feb 27, 2020
4 checks passed
galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this issue Mar 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:performance area:scheduler/executor area:webserver
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants