Skip to content

Commit

Permalink
Revert wrong migration revertion and revert the right one (#31429)
Browse files Browse the repository at this point in the history
* Revert "Revert "Save scheduler execution time by caching dags (#30704)" (#31413)"

This reverts commit e6f2117.

* Revert "Save scheduler execution time by adding new Index idea for dag_run (#30827)"

This reverts commit c63b777.
  • Loading branch information
potiuk committed May 19, 2023
1 parent 9d4bc59 commit 903dd8d
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 75 deletions.
18 changes: 15 additions & 3 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Collection, Iterable, Iterator
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator

from sqlalchemy import and_, func, not_, or_, text
from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -1052,8 +1053,13 @@ def _do_scheduling(self, session: Session) -> int:
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

# Send the callbacks after we commit to ensure the context is up to date when it gets run
# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
partial(self.dagbag.get_dag, session=session)
)
for dag_run, callback_to_run in callback_tuples:
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
dag = cached_get_dag(dag_run.dag_id)

if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
Expand Down Expand Up @@ -1317,8 +1323,14 @@ def _update_state(dag: DAG, dag_run: DagRun):
tags={"dag_id": dag.dag_id},
)

# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
partial(self.dagbag.get_dag, session=session)
)

for dag_run in dag_runs:
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
dag = dag_run.dag = cached_get_dag(dag_run.dag_id)

if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
Expand Down

This file was deleted.

9 changes: 0 additions & 9 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,6 @@ class DagRun(Base, LoggingMixin):
UniqueConstraint("dag_id", "execution_date", name="dag_run_dag_id_execution_date_key"),
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
Index("idx_last_scheduling_decision", last_scheduling_decision),
Index(
"idx_last_scheduling_decision_queued",
# Not possible to add .nulls_first(), because only postgresql can handle Index like that.
# Migration script which contains postgres dialect check adds NULLS FIST to index.
last_scheduling_decision,
execution_date,
_state,
postgresql_where=text("state='queued'"),
),
Index("idx_dag_run_dag_id", dag_id),
Index(
"idx_dag_run_running_dags",
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
811b1c45f8fa985feacacffafc30c82a6049bb33948c33bb218c13c48f971097
4987842fd67d29e194f1117e127d3291ba60d3fbc3e81cba75ce93884c263321
4 changes: 2 additions & 2 deletions docs/apache-airflow/img/airflow_erd.svg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 1 addition & 4 deletions docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
| ``14db5484317e`` (head) | ``937cbd173ca1`` | ``2.7.0`` | Add index on last_scheduling_decision NULLS FIRST, |
| | | | execution_date, state for queued dagrun |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``937cbd173ca1`` | ``98ae134e6fff`` | ``2.7.0`` | Add index to task_instance table |
| ``937cbd173ca1`` (head) | ``98ae134e6fff`` | ``2.7.0`` | Add index to task_instance table |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``98ae134e6fff`` | ``6abdffdd4815`` | ``2.6.0`` | Increase length of user identifier columns in ``ab_user`` |
| | | | and ``ab_register_user`` tables |
Expand Down

0 comments on commit 903dd8d

Please sign in to comment.