Skip to content

Commit

Permalink
Attribute 'clear_number' to track DAG run being cleared (#34126)
Browse files Browse the repository at this point in the history
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
  • Loading branch information
syun64 and uranusjr committed Sep 12, 2023
1 parent ed552e2 commit eed2901
Show file tree
Hide file tree
Showing 9 changed files with 933 additions and 862 deletions.
4 changes: 3 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Expand Up @@ -1348,12 +1348,14 @@ def _start_queued_dagruns(self, session: Session) -> None:
def _update_state(dag: DAG, dag_run: DagRun):
dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
if dag.timetable.periodic:
if dag.timetable.periodic and not dag_run.external_trigger and dag_run.clear_number < 1:
# TODO: Logically, this should be DagRunInfo.run_after, but the
# information is not stored on a DagRun, only before the actual
# execution on DagModel.next_dagrun_create_after. We should add
# a field on DagRun for this instead of relying on the run
# always happening immediately after the data interval.
# We only publish these metrics for scheduled dag runs and only
# when ``external_trigger`` is *False* and ``clear_number`` is 0.
expected_start_date = dag.get_run_data_interval(dag_run).end
schedule_delay = dag_run.start_date - expected_start_date
# Publish metrics twice with backward compatible name, and then with tags
Expand Down
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add new field 'clear_number' to dagrun
Revision ID: 375a816bbbf4
Revises: 405de8318b3a
Create Date: 2023-09-05 19:27:30.531558
"""

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision = "375a816bbbf4"
down_revision = "405de8318b3a"
branch_labels = None
depends_on = None
airflow_version = "2.8.0"


def upgrade():
"""Apply add cleared column to dagrun"""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.add_column(
sa.Column(
"clear_number",
sa.Integer,
default=0,
nullable=False,
)
)


def downgrade():
"""Unapply add cleared column to pool"""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.drop_column("clear_number")
7 changes: 6 additions & 1 deletion airflow/models/dagrun.py
Expand Up @@ -140,6 +140,7 @@ class DagRun(Base, LoggingMixin):
default=select(func.max(LogTemplate.__table__.c.id)),
)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow)
clear_number = Column(Integer, default=0, nullable=False)

# Remove this `if` after upgrading Sphinx-AutoAPI
if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
Expand Down Expand Up @@ -232,6 +233,7 @@ def __init__(
self.run_type = run_type
self.dag_hash = dag_hash
self.creating_job_id = creating_job_id
self.clear_number = 0
super().__init__()

def __repr__(self):
Expand Down Expand Up @@ -917,12 +919,15 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis
rid of the outliers on the stats side through dashboards tooling.
Note that the stat will only be emitted for scheduler-triggered DAG runs
(i.e. when ``external_trigger`` is *False*).
(i.e. when ``external_trigger`` is *False* and ``clear_number`` is
greater than 0).
"""
if self.state == TaskInstanceState.RUNNING:
return
if self.external_trigger:
return
if self.clear_number > 0:
return
if not finished_tis:
return

Expand Down
1 change: 1 addition & 0 deletions airflow/models/taskinstance.py
Expand Up @@ -349,6 +349,7 @@ def clear_task_instances(
if dag_run_state == DagRunState.QUEUED:
dr.last_scheduling_decision = None
dr.start_date = None
dr.clear_number += 1
session.flush()


Expand Down
1 change: 1 addition & 0 deletions airflow/utils/db.py
Expand Up @@ -88,6 +88,7 @@
"2.6.0": "98ae134e6fff",
"2.6.2": "c804e5c76e3e",
"2.7.0": "405de8318b3a",
"2.8.0": "375a816bbbf4",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
52cd8c81c9f586992515a903d882e72e8df0a55da5021d2614ba1807b8b63f7e
67b3dfeba9f0f4721ec4eaf3045ede333fdf16b40e6eaaa58f680f1dc95fcc6f

0 comments on commit eed2901

Please sign in to comment.