Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Soft delete datasets that are no longer referenced in DAG schedules or task outlets #27828

Merged
merged 26 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fb8d644
Soft delete datasets that are no longer referenced anywhere
blag Nov 23, 2022
67139d6
Move the orphaning logic into the scheduler and adjust config option …
blag Nov 23, 2022
99cbdf3
Rename config option scheduler.cleanup_interval -> scheduler.parsing_…
blag Nov 25, 2022
7efb532
Include default column value in migration
blag Nov 25, 2022
29f3d0a
deprecate old interval config; move code to scheduler_job
jedcunningham Nov 25, 2022
1950f15
First pass at a test
jedcunningham Nov 25, 2022
594e899
Fix migration
jedcunningham Nov 25, 2022
2cb2d99
Apply suggestions from code review
blag Nov 25, 2022
652f745
Don't batch migrations if we don't need to
blag Nov 25, 2022
e47c498
Revert "Don't batch migrations if we don't need to" - gotta batch mig…
blag Nov 25, 2022
a46086a
Tweak migrations
blag Nov 25, 2022
8d6d9c7
Use sqlalchemy.sql.True_() to support all DB backends
blag Nov 25, 2022
e88d909
Various cleanups
blag Nov 25, 2022
a123537
Add test for un-orphaning datasets once they are referenced again
blag Nov 25, 2022
e244273
Use sqlalchemy.sql.expression.true()
blag Nov 25, 2022
a3930b3
Fix orphaning datasets on MSSQL
blag Nov 25, 2022
559ee24
Comment the un-orphan process and use sqla.sql.expression.false()
blag Nov 25, 2022
83e865b
Merge remote-tracking branch 'apache/main' into soft-delete-orphaned-…
blag Nov 25, 2022
9eae8db
Add newsfragment about renamed config option
blag Nov 25, 2022
3a9864a
add mssql_drop_default flag
ephraimbuddy Nov 25, 2022
a6b3714
Use server_default in the ORM as well
blag Nov 25, 2022
6112e19
Defensively clear datasets before and after DAG tests
blag Nov 25, 2022
2197254
Reconcile migration with ORM model
blag Nov 25, 2022
bc7a35c
Remove now erroneous comment
blag Nov 25, 2022
a5cdb47
Change to use server_default='0'
ephraimbuddy Nov 25, 2022
286ed01
Update airflow/configuration.py
jedcunningham Nov 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1946,11 +1946,12 @@
type: string
example: ~
default: "30"
- name: deactivate_stale_dags_interval
- name: parsing_cleanup_interval
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
description: |
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
the expected files) which should be deactivated.
version_added: 2.2.5
the expected files) which should be deactivated, as well as datasets that are no longer
referenced and should be marked as orphaned.
version_added: 2.5.0
type: integer
example: ~
default: "60"
Expand Down
5 changes: 3 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,9 @@ scheduler_idle_sleep_time = 1
min_file_process_interval = 30

# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated.
deactivate_stale_dags_interval = 60
# the expected files) which should be deactivated, as well as datasets that are no longer
# referenced and should be marked as orphaned.
parsing_cleanup_interval = 60

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ class AirflowConfigParser(ConfigParser):
"worker_pods_pending_timeout_batch_size",
)
},
("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
}

# A mapping of old default values that we want to change and warn the user
Expand Down
4 changes: 2 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def __init__(
# Last time we cleaned up DAGs which are no longer in files
self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
# How often to check for DAGs which are no longer in files
self.deactivate_stale_dags_interval = conf.getint("scheduler", "deactivate_stale_dags_interval")
self.parsing_cleanup_interval = conf.getint("scheduler", "parsing_cleanup_interval")
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout
# How often to scan the DAGs directory for new files. Default to 5 minutes.
Expand Down Expand Up @@ -497,7 +497,7 @@ def _deactivate_stale_dags(self, session=None):
"""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
last_parsed = {
fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
}
Expand Down
47 changes: 44 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Collection, DefaultDict, Iterator

from sqlalchemy import func, not_, or_, text
from sqlalchemy import and_, func, not_, or_, text
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import load_only, selectinload
from sqlalchemy.orm.session import Session, make_transient
Expand All @@ -46,7 +46,13 @@
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent
from airflow.models.dataset import (
DagScheduleDatasetReference,
DatasetDagRunQueue,
DatasetEvent,
DatasetModel,
TaskOutletDatasetReference,
)
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
from airflow.stats import Stats
Expand Down Expand Up @@ -854,9 +860,14 @@ def _run_scheduler_loop(self) -> None:
)
timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)

timers.call_regular_interval(
conf.getfloat("scheduler", "parsing_cleanup_interval"),
self._orphan_unreferenced_datasets,
)

if self._standalone_dag_processor:
timers.call_regular_interval(
conf.getfloat("scheduler", "deactivate_stale_dags_interval", fallback=60.0),
conf.getfloat("scheduler", "parsing_cleanup_interval"),
self._cleanup_stale_dags,
)

Expand Down Expand Up @@ -1574,3 +1585,33 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
dag.is_active = False
SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
session.flush()

@provide_session
def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
"""
Detects datasets that are no longer referenced in any DAG schedule parameters or task outlets and
sets the dataset is_orphaned flag to True
"""
orphaned_dataset_query = (
session.query(DatasetModel)
.join(
DagScheduleDatasetReference,
isouter=True,
)
.join(
TaskOutletDatasetReference,
isouter=True,
)
# MSSQL doesn't like it when we select a column that we haven't grouped by. All other DBs let us
# group by id and select all columns.
.group_by(DatasetModel if session.get_bind().dialect.name == "mssql" else DatasetModel.id)
blag marked this conversation as resolved.
Show resolved Hide resolved
.having(
and_(
func.count(DagScheduleDatasetReference.dag_id) == 0,
func.count(TaskOutletDatasetReference.dag_id) == 0,
)
)
)
for dataset in orphaned_dataset_query:
self.log.info("Orphaning unreferenced dataset '%s'", dataset.uri)
dataset.is_orphaned = expression.true()
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add is_orphaned to DatasetModel

Revision ID: 290244fb8b83
Revises: 1986afd32c1b
Create Date: 2022-11-22 00:12:53.432961

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "290244fb8b83"
down_revision = "1986afd32c1b"
branch_labels = None
depends_on = None
airflow_version = "2.5.0"


def upgrade():
"""Add is_orphaned to DatasetModel"""
with op.batch_alter_table("dataset") as batch_op:
blag marked this conversation as resolved.
Show resolved Hide resolved
batch_op.add_column(
sa.Column(
"is_orphaned",
sa.Boolean,
default=False,
nullable=False,
server_default="0",
)
)


def downgrade():
"""Remove is_orphaned from DatasetModel"""
with op.batch_alter_table("dataset") as batch_op:
batch_op.drop_column("is_orphaned", mssql_drop_default=True)
4 changes: 4 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2825,6 +2825,10 @@ def bulk_write_to_db(
for dataset in all_datasets:
stored_dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).first()
if stored_dataset:
# Some datasets may have been previously unreferenced, and therefore orphaned by the
# scheduler. But if we're here, then we have found that dataset again in our DAGs, which
# means that it is no longer an orphan, so set is_orphaned to False.
stored_dataset.is_orphaned = expression.false()
stored_datasets[stored_dataset.uri] = stored_dataset
else:
session.add(dataset)
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import sqlalchemy_jsonfield
from sqlalchemy import (
Boolean,
Column,
ForeignKey,
ForeignKeyConstraint,
Expand Down Expand Up @@ -64,6 +65,7 @@ class DatasetModel(Base):
extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
is_orphaned = Column(Boolean, default=False, nullable=False, server_default="0")

consuming_dags = relationship("DagScheduleDatasetReference", back_populates="dataset")
producing_tasks = relationship("TaskOutletDatasetReference", back_populates="dataset")
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3565,7 +3565,7 @@ def next_run_datasets(self, dag_id):
),
isouter=True,
)
.filter(DagScheduleDatasetReference.dag_id == dag_id)
.filter(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned)
.group_by(DatasetModel.id, DatasetModel.uri)
.order_by(DatasetModel.uri)
.all()
Expand Down Expand Up @@ -3688,7 +3688,7 @@ def datasets_summary(self):
if has_event_filters:
count_query = count_query.join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

filters = []
filters = [~DatasetModel.is_orphaned]
if uri_pattern:
filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
if updated_after:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f529521071a6c9ae8bbd58d63cf1195fc1ec964308e7684569d0a36d26534def
5bee32cf7239656360cfe7978e7f01d6ee3b9914d3f3fd89ed4ff747254dddf8
Loading