Skip to content

Commit

Permalink
Add 'queued' state to DagRun (#16401)
Browse files Browse the repository at this point in the history
This change adds queued state to DagRun. Newly created DagRuns
start in the queued state, are then moved to the running state after
satisfying the DAG's max_active_runs. If the Dag doesn't have
max_active_runs, the DagRuns are moved to running state immediately

Clearing a DagRun sets the state to queued state

Closes: #9975, #16366
(cherry picked from commit 6611ffd)
  • Loading branch information
ephraimbuddy authored and kaxil committed Aug 17, 2021
1 parent d81d8fc commit 95ef3e3
Show file tree
Hide file tree
Showing 18 changed files with 338 additions and 298 deletions.
4 changes: 3 additions & 1 deletion airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,7 @@ components:
description: |
The start time. The time when DAG run was actually created.
readOnly: true
nullable: true
end_date:
type: string
format: date-time
Expand Down Expand Up @@ -3025,8 +3026,9 @@ components:
description: DAG State.
type: string
enum:
- success
- queued
- running
- success
- failed

TriggerRule:
Expand Down
171 changes: 66 additions & 105 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import time
from collections import defaultdict
from datetime import timedelta
from typing import DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
from typing import DefaultDict, Dict, Iterable, List, Optional, Tuple

from sqlalchemy import and_, func, not_, or_, tuple_
from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -197,7 +197,7 @@ def _change_state_for_tis_without_dagrun(
"""
For all DAG IDs in the DagBag, look for task instances in the
old_states and set them to new_state if the corresponding DagRun
does not exist or exists but is not in the running state. This
does not exist or exists but is not in the running or queued state. This
normally should not happen, but it can if the state of DagRuns are
changed manually.
Expand All @@ -214,7 +214,7 @@ def _change_state_for_tis_without_dagrun(
.filter(models.TaskInstance.state.in_(old_states))
.filter(
or_(
models.DagRun.state != State.RUNNING,
models.DagRun.state.notin_([State.RUNNING, State.QUEUED]),
models.DagRun.state.is_(None),
)
)
Expand Down Expand Up @@ -882,39 +882,12 @@ def _do_scheduling(self, session) -> int:
if settings.USE_JOB_SCHEDULE:
self._create_dagruns_for_dags(guard, session)

dag_runs = self._get_next_dagruns_to_examine(session)
self._start_queued_dagruns(session)
guard.commit()
dag_runs = self._get_next_dagruns_to_examine(State.RUNNING, session)
# Bulk fetch the currently active dag runs for the dags we are
# examining, rather than making one query per DagRun

# TODO: This query is probably horribly inefficient (though there is an
# index on (dag_id,state)). It is to deal with the case when a user
# clears more than max_active_runs older tasks -- we don't want the
# scheduler to suddenly go and start running tasks from all of the
# runs. (AIRFLOW-137/GH #1442)
#
# The longer term fix would be to have `clear` do this, and put DagRuns
# in to the queued state, then take DRs out of queued before creating
# any new ones

# Build up a set of execution_dates that are "active" for a given
# dag_id -- only tasks from those runs will be scheduled.
active_runs_by_dag_id = defaultdict(set)

query = (
session.query(
TI.dag_id,
TI.execution_date,
)
.filter(
TI.dag_id.in_(list({dag_run.dag_id for dag_run in dag_runs})),
TI.state.notin_(list(State.finished) + [State.REMOVED]),
)
.group_by(TI.dag_id, TI.execution_date)
)

for dag_id, execution_date in query:
active_runs_by_dag_id[dag_id].add(execution_date)

for dag_run in dag_runs:
# Use try_except to not stop the Scheduler when a Serialized DAG is not found
# This takes care of Dynamic DAGs especially
Expand All @@ -923,7 +896,7 @@ def _do_scheduling(self, session) -> int:
# But this would take care of the scenario when the Scheduler is restarted after DagRun is
# created and the DAG is deleted / renamed
try:
self._schedule_dag_run(dag_run, active_runs_by_dag_id.get(dag_run.dag_id, set()), session)
self._schedule_dag_run(dag_run, session)
except SerializedDagNotFound:
self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
Expand Down Expand Up @@ -963,9 +936,9 @@ def _do_scheduling(self, session) -> int:
return num_queued_tis

@retry_db_transaction
def _get_next_dagruns_to_examine(self, session):
def _get_next_dagruns_to_examine(self, state, session):
"""Get Next DagRuns to Examine with retries"""
return DagRun.next_dagruns_to_examine(session)
return DagRun.next_dagruns_to_examine(state, session)

@retry_db_transaction
def _create_dagruns_for_dags(self, guard, session):
Expand All @@ -986,14 +959,24 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we don't attempt to create
# duplicate dag runs
active_dagruns = (
session.query(DagRun.dag_id, DagRun.execution_date)
.filter(
tuple_(DagRun.dag_id, DagRun.execution_date).in_(
[(dm.dag_id, dm.next_dagrun) for dm in dag_models]

if session.bind.dialect.name == 'mssql':
existing_dagruns_filter = or_(
*(
and_(
DagRun.dag_id == dm.dag_id,
DagRun.execution_date == dm.next_dagrun,
)
for dm in dag_models
)
)
.all()
else:
existing_dagruns_filter = tuple_(DagRun.dag_id, DagRun.execution_date).in_(
[(dm.dag_id, dm.next_dagrun) for dm in dag_models]
)

existing_dagruns = (
session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all()
)

for dag_model in dag_models:
Expand All @@ -1009,89 +992,83 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
# are not updated.
# We opted to check DagRun existence instead
# of catching an Integrity error and rolling back the session i.e
# we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
# we need to set dag.next_dagrun_info if the Dag Run already exists or if we
# create a new one. This is so that in the next Scheduling loop we try to create new runs
# instead of falling in a loop of Integrity Error.
if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
run = dag.create_dagrun(
if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dag_model.next_dagrun,
start_date=timezone.utcnow(),
state=State.RUNNING,
state=State.QUEUED,
external_trigger=False,
session=session,
dag_hash=dag_hash,
creating_job_id=self.id,
)

expected_start_date = dag.following_schedule(run.execution_date)
if expected_start_date:
schedule_delay = run.start_date - expected_start_date
Stats.timing(
f'dagrun.schedule_delay.{dag.dag_id}',
schedule_delay,
)

self._update_dag_next_dagruns(dag_models, session)
dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
dag_model.next_dagrun
)

# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
# memory for larger dags? or expunge_all()

def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
"""
Bulk update the next_dagrun and next_dagrun_create_after for all the dags.
def _start_queued_dagruns(
self,
session: Session,
) -> int:
"""Find DagRuns in queued state and decide moving them to running state"""
dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)

We batch the select queries to get info about all the dags at once
"""
# Check max_active_runs, to see if we are _now_ at the limit for any of
# these dag? (we've just created a DagRun for them after all)
active_runs_of_dags = dict(
active_runs_of_dags = defaultdict(
lambda: 0,
session.query(DagRun.dag_id, func.count('*'))
.filter(
DagRun.dag_id.in_([o.dag_id for o in dag_models]),
.filter( # We use `list` here because SQLA doesn't accept a set
# We use set to avoid duplicate dag_ids
DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
DagRun.state == State.RUNNING,
DagRun.external_trigger.is_(False),
)
.group_by(DagRun.dag_id)
.all()
.all(),
)

for dag_model in dag_models:
# Get the DAG in a try_except to not stop the Scheduler when a Serialized DAG is not found
# This takes care of Dynamic DAGs especially
def _update_state(dag_run):
dag_run.state = State.RUNNING
dag_run.start_date = timezone.utcnow()
expected_start_date = dag.following_schedule(dag_run.execution_date)
if expected_start_date:
schedule_delay = dag_run.start_date - expected_start_date
Stats.timing(
f'dagrun.schedule_delay.{dag.dag_id}',
schedule_delay,
)

for dag_run in dag_runs:
try:
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
except SerializedDagNotFound:
self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
active_runs_of_dag = active_runs_of_dags.get(dag.dag_id, 0)
if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
self.log.info(
"DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
active_runs = active_runs_of_dags[dag_run.dag_id]
if dag.max_active_runs and active_runs >= dag.max_active_runs:
self.log.debug(
"DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
dag.dag_id,
active_runs_of_dag,
dag.max_active_runs,
active_runs,
dag_run.execution_date,
)
dag_model.next_dagrun_create_after = None
else:
dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
dag_model.next_dagrun
)
active_runs_of_dags[dag_run.dag_id] += 1
_update_state(dag_run)

def _schedule_dag_run(
self,
dag_run: DagRun,
currently_active_runs: Set[datetime.datetime],
session: Session,
) -> int:
"""
Make scheduling decisions about an individual dag run
``currently_active_runs`` is passed in so that a batch query can be
used to ask this for all dag runs in the batch, to avoid an n+1 query.
:param dag_run: The DagRun to schedule
:param currently_active_runs: Number of currently active runs of this DAG
:return: Number of tasks scheduled
"""
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
Expand All @@ -1118,9 +1095,6 @@ def _schedule_dag_run(
session.flush()
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)

# Work out if we should allow creating a new DagRun now?
self._update_dag_next_dagruns([session.query(DagModel).get(dag_run.dag_id)], session)

callback_to_execute = DagCallbackRequest(
full_filepath=dag.fileloc,
dag_id=dag.dag_id,
Expand All @@ -1138,19 +1112,6 @@ def _schedule_dag_run(
self.log.error("Execution date is in future: %s", dag_run.execution_date)
return 0

if dag.max_active_runs:
if (
len(currently_active_runs) >= dag.max_active_runs
and dag_run.execution_date not in currently_active_runs
):
self.log.info(
"DAG %s already has %d active runs, not queuing any tasks for run %s",
dag.dag_id,
len(currently_active_runs),
dag_run.execution_date,
)
return 0

self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add queued_at column to dagrun table
Revision ID: 97cdd93827b8
Revises: a13f7613ad25
Create Date: 2021-06-29 21:53:48.059438
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import mssql

# revision identifiers, used by Alembic.
revision = '97cdd93827b8'
down_revision = 'a13f7613ad25'
branch_labels = None
depends_on = None


def upgrade():
"""Apply Add queued_at column to dagrun table"""
conn = op.get_bind()
if conn.dialect.name == "mssql":
op.add_column('dag_run', sa.Column('queued_at', mssql.DATETIME2(precision=6), nullable=True))
else:
op.add_column('dag_run', sa.Column('queued_at', sa.DateTime(), nullable=True))


def downgrade():
"""Unapply Add queued_at column to dagrun table"""
op.drop_column('dag_run', "queued_at")
4 changes: 2 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ def clear(
confirm_prompt=False,
include_subdags=True,
include_parentdag=True,
dag_run_state: str = State.RUNNING,
dag_run_state: str = State.QUEUED,
dry_run=False,
session=None,
get_tis=False,
Expand Down Expand Up @@ -1369,7 +1369,7 @@ def clear_dags(
confirm_prompt=False,
include_subdags=True,
include_parentdag=False,
dag_run_state=State.RUNNING,
dag_run_state=State.QUEUED,
dry_run=False,
):
all_tis = []
Expand Down

0 comments on commit 95ef3e3

Please sign in to comment.