Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Soft delete datasets that are no longer referenced in DAG schedules or task outlets #27828

Merged
merged 26 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fb8d644
Soft delete datasets that are no longer referenced anywhere
blag Nov 23, 2022
67139d6
Move the orphaning logic into the scheduler and adjust config option …
blag Nov 23, 2022
99cbdf3
Rename config option scheduler.cleanup_interval -> scheduler.parsing_…
blag Nov 25, 2022
7efb532
Include default column value in migration
blag Nov 25, 2022
29f3d0a
deprecate old interval config; move code to scheduler_job
jedcunningham Nov 25, 2022
1950f15
First pass at a test
jedcunningham Nov 25, 2022
594e899
Fix migration
jedcunningham Nov 25, 2022
2cb2d99
Apply suggestions from code review
blag Nov 25, 2022
652f745
Don't batch migrations if we don't need to
blag Nov 25, 2022
e47c498
Revert "Don't batch migrations if we don't need to" - gotta batch mig…
blag Nov 25, 2022
a46086a
Tweak migrations
blag Nov 25, 2022
8d6d9c7
Use sqlalchemy.sql.True_() to support all DB backends
blag Nov 25, 2022
e88d909
Various cleanups
blag Nov 25, 2022
a123537
Add test for un-orphaning datasets once they are referenced again
blag Nov 25, 2022
e244273
Use sqlalchemy.sql.expression.true()
blag Nov 25, 2022
a3930b3
Fix orphaning datasets on MSSQL
blag Nov 25, 2022
559ee24
Comment the un-orphan process and use sqla.sql.expression.false()
blag Nov 25, 2022
83e865b
Merge remote-tracking branch 'apache/main' into soft-delete-orphaned-…
blag Nov 25, 2022
9eae8db
Add newsfragment about renamed config option
blag Nov 25, 2022
3a9864a
add mssql_drop_default flag
ephraimbuddy Nov 25, 2022
a6b3714
Use server_default in the ORM as well
blag Nov 25, 2022
6112e19
Defensively clear datasets before and after DAG tests
blag Nov 25, 2022
2197254
Reconcile migration with ORM model
blag Nov 25, 2022
bc7a35c
Remove now erroneous comment
blag Nov 25, 2022
a5cdb47
Change to use server_default='0'
ephraimbuddy Nov 25, 2022
286ed01
Update airflow/configuration.py
jedcunningham Nov 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1946,11 +1946,12 @@
type: string
example: ~
default: "30"
- name: deactivate_stale_dags_interval
- name: parsing_cleanup_interval
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
description: |
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
the expected files) which should be deactivated.
version_added: 2.2.5
the expected files) which should be deactivated, as well as datasets that are no longer
referenced and should be marked as orphaned.
version_added: 2.5.0
type: integer
example: ~
default: "60"
Expand Down
5 changes: 3 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,9 @@ scheduler_idle_sleep_time = 1
min_file_process_interval = 30

# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated.
deactivate_stale_dags_interval = 60
# the expected files) which should be deactivated, as well as datasets that are no longer
# referenced and should be marked as orphaned.
parsing_cleanup_interval = 60

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ class AirflowConfigParser(ConfigParser):
"worker_pods_pending_timeout_batch_size",
)
},
("core", "parsing_cleanup_interval"): ("core", "deactivate_stale_dags_interval", "2.5.0"),
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
}

# A mapping of old default values that we want to change and warn the user
Expand Down
8 changes: 5 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,10 @@ def __init__(
self.last_stat_print_time = 0
# Last time we cleaned up DAGs which are no longer in files
self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
# How often to check for DAGs which are no longer in files
self.deactivate_stale_dags_interval = conf.getint("scheduler", "deactivate_stale_dags_interval")
# How often to clean up:
# * DAGs which are no longer in files
# * datasets that are no longer referenced by any DAG schedule parameters or task outlets
blag marked this conversation as resolved.
Show resolved Hide resolved
self.parsing_cleanup_interval = conf.getint("scheduler", "parsing_cleanup_interval")
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout
# How often to scan the DAGs directory for new files. Default to 5 minutes.
Expand Down Expand Up @@ -497,7 +499,7 @@ def _deactivate_stale_dags(self, session=None):
"""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
last_parsed = {
fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
}
Expand Down
47 changes: 44 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Collection, DefaultDict, Iterator

from sqlalchemy import func, not_, or_, text
from sqlalchemy import and_, func, not_, or_, text
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import load_only, selectinload
from sqlalchemy.orm.session import Session, make_transient
Expand All @@ -46,7 +46,13 @@
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent
from airflow.models.dataset import (
DagScheduleDatasetReference,
DatasetDagRunQueue,
DatasetEvent,
DatasetModel,
TaskOutletDatasetReference,
)
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
from airflow.stats import Stats
Expand Down Expand Up @@ -854,9 +860,14 @@ def _run_scheduler_loop(self) -> None:
)
timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)

timers.call_regular_interval(
conf.getfloat("scheduler", "parsing_cleanup_interval"),
self._orphan_unreferenced_datasets,
)

if self._standalone_dag_processor:
timers.call_regular_interval(
conf.getfloat("scheduler", "deactivate_stale_dags_interval", fallback=60.0),
conf.getfloat("scheduler", "parsing_cleanup_interval"),
self._cleanup_stale_dags,
)

Expand Down Expand Up @@ -1574,3 +1585,33 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
dag.is_active = False
SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
session.flush()

@provide_session
def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
"""
Detects datasets that are no longer referenced in any DAG schedule parameters or task outlets and
sets the dataset is_orphaned flags to True
"""
orphaned_dataset_query = (
session.query(DatasetModel)
.join(
DagScheduleDatasetReference,
DagScheduleDatasetReference.dataset_id == DatasetModel.id,
isouter=True,
)
.join(
TaskOutletDatasetReference,
TaskOutletDatasetReference.dataset_id == DatasetModel.id,
isouter=True,
)
.group_by(DatasetModel.id)
.having(
and_(
func.count(DagScheduleDatasetReference.dag_id) == 0,
func.count(TaskOutletDatasetReference.dag_id) == 0,
)
)
)
for dataset in orphaned_dataset_query.all():
blag marked this conversation as resolved.
Show resolved Hide resolved
self.log.info("Orphaning dataset '%s'", dataset.uri)
blag marked this conversation as resolved.
Show resolved Hide resolved
dataset.is_orphaned = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add is_orphaned to DatasetModel

Revision ID: 290244fb8b83
Revises: 65a852f26899
Create Date: 2022-11-22 00:12:53.432961

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "290244fb8b83"
down_revision = "65a852f26899"
branch_labels = None
depends_on = None
airflow_version = "2.5.0"


def upgrade():
"""Add is_orphaned to DatasetModel"""
with op.batch_alter_table("dataset") as batch_op:
blag marked this conversation as resolved.
Show resolved Hide resolved
batch_op.add_column(sa.Column("is_orphaned", sa.Boolean, nullable=False, server_default="False"))


def downgrade():
"""Remove is_orphaned to DatasetModel"""
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.drop_column("is_orphaned")
1 change: 1 addition & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2828,6 +2828,7 @@ def bulk_write_to_db(
for dataset in all_datasets:
stored_dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).first()
if stored_dataset:
stored_dataset.is_orphaned = False
blag marked this conversation as resolved.
Show resolved Hide resolved
stored_datasets[stored_dataset.uri] = stored_dataset
else:
session.add(dataset)
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import sqlalchemy_jsonfield
from sqlalchemy import (
Boolean,
Column,
ForeignKey,
ForeignKeyConstraint,
Expand Down Expand Up @@ -64,6 +65,7 @@ class DatasetModel(Base):
extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
is_orphaned = Column(Boolean, default=False, server_default="False", nullable=False)

consuming_dags = relationship("DagScheduleDatasetReference", back_populates="dataset")
producing_tasks = relationship("TaskOutletDatasetReference", back_populates="dataset")
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3525,7 +3525,7 @@ def next_run_datasets(self, dag_id):
),
isouter=True,
)
.filter(DagScheduleDatasetReference.dag_id == dag_id)
.filter(DagScheduleDatasetReference.dag_id == dag_id, DatasetModel.is_orphaned.is_(False))
blag marked this conversation as resolved.
Show resolved Hide resolved
.group_by(DatasetModel.id, DatasetModel.uri)
.order_by(DatasetModel.uri)
.all()
Expand Down Expand Up @@ -3648,7 +3648,7 @@ def datasets_summary(self):
if has_event_filters:
count_query = count_query.join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

filters = []
filters = [DatasetModel.is_orphaned.is_(False)]
blag marked this conversation as resolved.
Show resolved Hide resolved
if uri_pattern:
filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
if updated_after:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
bf1db1b1041afe3ef6277d96701f6d82bce44497b2b4c49ee79f7bb198f51042
ab67d584c9912231e6963abdd710ede4228cb2f23d67219485361c75d4f9bbd7
Loading