Skip to content

Commit

Permalink
Soft delete datasets that are no longer referenced in DAG schedules o…
Browse files Browse the repository at this point in the history
…r task outlets (#27828)

* Soft delete datasets that are no longer referenced anywhere

* Move the orphaning logic into the scheduler and adjust config option name accordingly

* Rename config option scheduler.cleanup_interval -> scheduler.parsing_cleanup_interval

* Include default column value in migration

* deprecate old interval config; move code to scheduler_job

* First pass at a test

* Fix migration

* Apply suggestions from code review

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Don't batch migrations if we don't need to

* Revert "Don't batch migrations if we don't need to" - gotta batch migrations for SQLite

This reverts commit 652f745.

* Tweak migrations

* Use sqlalchemy.sql.True_() to support all DB backends

* Various cleanups

* Add test for un-orphaning datasets once they are referenced again

* Use sqlalchemy.sql.expression.true()

Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>

* Fix orphaning datasets on MSSQL

* Comment the un-orphan process and use sqla.sql.expression.false()

* Add newsfragment about renamed config option

* add mssql_drop_default flag

* Use server_default in the ORM as well

* Defensively clear datasets before and after DAG tests

* Reconcile migration with ORM model

* Remove now erroneous comment

* Change to use server_default='0'

* Update airflow/configuration.py

Co-authored-by: Jed Cunningham <jedcunningham@apache.org>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
  • Loading branch information
4 people committed Nov 25, 2022
1 parent 3e288ab commit 3fef6a4
Show file tree
Hide file tree
Showing 15 changed files with 422 additions and 215 deletions.
7 changes: 4 additions & 3 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1946,11 +1946,12 @@
type: string
example: ~
default: "30"
- name: deactivate_stale_dags_interval
- name: parsing_cleanup_interval
description: |
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
the expected files) which should be deactivated.
version_added: 2.2.5
the expected files) which should be deactivated, as well as datasets that are no longer
referenced and should be marked as orphaned.
version_added: 2.5.0
type: integer
example: ~
default: "60"
Expand Down
5 changes: 3 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,9 @@ scheduler_idle_sleep_time = 1
min_file_process_interval = 30

# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated.
deactivate_stale_dags_interval = 60
# the expected files) which should be deactivated, as well as datasets that are no longer
# referenced and should be marked as orphaned.
parsing_cleanup_interval = 60

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ class AirflowConfigParser(ConfigParser):
"worker_pods_pending_timeout_batch_size",
)
},
("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
}

# A mapping of old default values that we want to change and warn the user
Expand Down
4 changes: 2 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def __init__(
# Last time we cleaned up DAGs which are no longer in files
self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
# How often to check for DAGs which are no longer in files
self.deactivate_stale_dags_interval = conf.getint("scheduler", "deactivate_stale_dags_interval")
self.parsing_cleanup_interval = conf.getint("scheduler", "parsing_cleanup_interval")
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout
# How often to scan the DAGs directory for new files. Default to 5 minutes.
Expand Down Expand Up @@ -497,7 +497,7 @@ def _deactivate_stale_dags(self, session=None):
"""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
last_parsed = {
fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
}
Expand Down
47 changes: 44 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Collection, DefaultDict, Iterator

from sqlalchemy import func, not_, or_, text
from sqlalchemy import and_, func, not_, or_, text
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import load_only, selectinload
from sqlalchemy.orm.session import Session, make_transient
Expand All @@ -46,7 +46,13 @@
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent
from airflow.models.dataset import (
DagScheduleDatasetReference,
DatasetDagRunQueue,
DatasetEvent,
DatasetModel,
TaskOutletDatasetReference,
)
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
from airflow.stats import Stats
Expand Down Expand Up @@ -854,9 +860,14 @@ def _run_scheduler_loop(self) -> None:
)
timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)

timers.call_regular_interval(
conf.getfloat("scheduler", "parsing_cleanup_interval"),
self._orphan_unreferenced_datasets,
)

if self._standalone_dag_processor:
timers.call_regular_interval(
conf.getfloat("scheduler", "deactivate_stale_dags_interval", fallback=60.0),
conf.getfloat("scheduler", "parsing_cleanup_interval"),
self._cleanup_stale_dags,
)

Expand Down Expand Up @@ -1574,3 +1585,33 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
dag.is_active = False
SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
session.flush()

@provide_session
def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
"""
Detects datasets that are no longer referenced in any DAG schedule parameters or task outlets and
sets the dataset is_orphaned flag to True
"""
orphaned_dataset_query = (
session.query(DatasetModel)
.join(
DagScheduleDatasetReference,
isouter=True,
)
.join(
TaskOutletDatasetReference,
isouter=True,
)
# MSSQL doesn't like it when we select a column that we haven't grouped by. All other DBs let us
# group by id and select all columns.
.group_by(DatasetModel if session.get_bind().dialect.name == "mssql" else DatasetModel.id)
.having(
and_(
func.count(DagScheduleDatasetReference.dag_id) == 0,
func.count(TaskOutletDatasetReference.dag_id) == 0,
)
)
)
for dataset in orphaned_dataset_query:
self.log.info("Orphaning unreferenced dataset '%s'", dataset.uri)
dataset.is_orphaned = expression.true()
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add is_orphaned to DatasetModel
Revision ID: 290244fb8b83
Revises: 1986afd32c1b
Create Date: 2022-11-22 00:12:53.432961
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "290244fb8b83"
down_revision = "1986afd32c1b"
branch_labels = None
depends_on = None
airflow_version = "2.5.0"


def upgrade():
"""Add is_orphaned to DatasetModel"""
with op.batch_alter_table("dataset") as batch_op:
batch_op.add_column(
sa.Column(
"is_orphaned",
sa.Boolean,
default=False,
nullable=False,
server_default="0",
)
)


def downgrade():
"""Remove is_orphaned from DatasetModel"""
with op.batch_alter_table("dataset") as batch_op:
batch_op.drop_column("is_orphaned", mssql_drop_default=True)
4 changes: 4 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2825,6 +2825,10 @@ def bulk_write_to_db(
for dataset in all_datasets:
stored_dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).first()
if stored_dataset:
# Some datasets may have been previously unreferenced, and therefore orphaned by the
# scheduler. But if we're here, then we have found that dataset again in our DAGs, which
# means that it is no longer an orphan, so set is_orphaned to False.
stored_dataset.is_orphaned = expression.false()
stored_datasets[stored_dataset.uri] = stored_dataset
else:
session.add(dataset)
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import sqlalchemy_jsonfield
from sqlalchemy import (
Boolean,
Column,
ForeignKey,
ForeignKeyConstraint,
Expand Down Expand Up @@ -64,6 +65,7 @@ class DatasetModel(Base):
extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
is_orphaned = Column(Boolean, default=False, nullable=False, server_default="0")

consuming_dags = relationship("DagScheduleDatasetReference", back_populates="dataset")
producing_tasks = relationship("TaskOutletDatasetReference", back_populates="dataset")
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3565,7 +3565,7 @@ def next_run_datasets(self, dag_id):
),
isouter=True,
)
.filter(DagScheduleDatasetReference.dag_id == dag_id)
.filter(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned)
.group_by(DatasetModel.id, DatasetModel.uri)
.order_by(DatasetModel.uri)
.all()
Expand Down Expand Up @@ -3688,7 +3688,7 @@ def datasets_summary(self):
if has_event_filters:
count_query = count_query.join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

filters = []
filters = [~DatasetModel.is_orphaned]
if uri_pattern:
filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
if updated_after:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f529521071a6c9ae8bbd58d63cf1195fc1ec964308e7684569d0a36d26534def
5bee32cf7239656360cfe7978e7f01d6ee3b9914d3f3fd89ed4ff747254dddf8
Loading

0 comments on commit 3fef6a4

Please sign in to comment.