Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New 'clear_number' attribute for dag_run to track the number of times it has been cleared #34126

Merged
merged 5 commits into from Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Expand Up @@ -1348,12 +1348,14 @@ def _start_queued_dagruns(self, session: Session) -> None:
def _update_state(dag: DAG, dag_run: DagRun):
dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
if dag.timetable.periodic:
if dag.timetable.periodic and not dag_run.external_trigger and dag_run.clear_number < 1:
# TODO: Logically, this should be DagRunInfo.run_after, but the
# information is not stored on a DagRun, only before the actual
# execution on DagModel.next_dagrun_create_after. We should add
# a field on DagRun for this instead of relying on the run
# always happening immediately after the data interval.
# We only publish these metrics for scheduled dag runs and only
# when ``external_trigger`` is *False* and ``clear_number`` is 0.
expected_start_date = dag.get_run_data_interval(dag_run).end
schedule_delay = dag_run.start_date - expected_start_date
# Publish metrics twice with backward compatible name, and then with tags
Expand Down
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add new field 'clear_number' to dagrun

Revision ID: 375a816bbbf4
Revises: 405de8318b3a
Create Date: 2023-09-05 19:27:30.531558

"""

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision = "375a816bbbf4"
down_revision = "405de8318b3a"
branch_labels = None
depends_on = None
airflow_version = "2.8.0"


def upgrade():
"""Apply add cleared column to dagrun"""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.add_column(
sa.Column(
"clear_number",
sa.Integer,
default=0,
nullable=False,
)
Comment on lines +43 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if we want to add a constraint on this to ensure the value is not negative. Or if not, maybe we should change the == 0 check to < 1 instead. A bit paranoid probably, but it’s mostly free safety so why not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not paranoid, these are great suggestions. I took all of your suggestions @uranusjr thank you for your input :)

)


def downgrade():
"""Unapply add cleared column to pool"""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.drop_column("clear_number")
7 changes: 6 additions & 1 deletion airflow/models/dagrun.py
Expand Up @@ -140,6 +140,7 @@ class DagRun(Base, LoggingMixin):
default=select(func.max(LogTemplate.__table__.c.id)),
)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow)
clear_number = Column(Integer, default=0, nullable=False)

# Remove this `if` after upgrading Sphinx-AutoAPI
if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
Expand Down Expand Up @@ -232,6 +233,7 @@ def __init__(
self.run_type = run_type
self.dag_hash = dag_hash
self.creating_job_id = creating_job_id
self.clear_number = 0
super().__init__()

def __repr__(self):
Expand Down Expand Up @@ -917,12 +919,15 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis
rid of the outliers on the stats side through dashboards tooling.

Note that the stat will only be emitted for scheduler-triggered DAG runs
(i.e. when ``external_trigger`` is *False*).
(i.e. when ``external_trigger`` is *False* and ``clear_number`` is
greater than 0).
"""
if self.state == TaskInstanceState.RUNNING:
return
if self.external_trigger:
return
if self.clear_number > 0:
return
if not finished_tis:
return

Expand Down
1 change: 1 addition & 0 deletions airflow/models/taskinstance.py
Expand Up @@ -348,6 +348,7 @@ def clear_task_instances(
if dag_run_state == DagRunState.QUEUED:
dr.last_scheduling_decision = None
dr.start_date = None
dr.clear_number += 1
session.flush()


Expand Down
1 change: 1 addition & 0 deletions airflow/utils/db.py
Expand Up @@ -88,6 +88,7 @@
"2.6.0": "98ae134e6fff",
"2.6.2": "c804e5c76e3e",
"2.7.0": "405de8318b3a",
"2.8.0": "375a816bbbf4",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
52cd8c81c9f586992515a903d882e72e8df0a55da5021d2614ba1807b8b63f7e
67b3dfeba9f0f4721ec4eaf3045ede333fdf16b40e6eaaa58f680f1dc95fcc6f