Skip to content

Commit

Permalink
Move the orphaning logic into the scheduler and adjust config option …
Browse files Browse the repository at this point in the history
…name accordingly
  • Loading branch information
blag committed Nov 23, 2022
1 parent fb8d644 commit 67139d6
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 33 deletions.
15 changes: 13 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1948,12 +1948,23 @@
default: "30"
- name: deactivate_stale_dags_interval
description: |
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
the expected files) which should be deactivated.
[DEPRECATED] How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
the expected files) which should be deactivated. This is deprecated in favor of cleanup_interval,
and is ignored when that is set.
version_added: 2.2.5
type: integer
example: ~
default: "60"
- name: cleanup_interval
description: |
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
the expected files) which should be deactivated, as well as datasets that are no longer
referenced and should be marked as orphaned. If this is set, then deactivate_stale_dags_interval is
ignored.
version_added: 2.4.5
type: integer
example: ~
default: "60"
- name: dag_dir_list_interval
description: |
How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
Expand Down
11 changes: 9 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -984,10 +984,17 @@ scheduler_idle_sleep_time = 1
# this interval. Keeping this number low will increase CPU usage.
min_file_process_interval = 30

# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated.
# [DEPRECATED] How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated. This is deprecated in favor of cleanup_interval,
# and is ignored when that is set.
deactivate_stale_dags_interval = 60

# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated, as well as datasets that are no longer
# referenced and should be marked as orphaned. If this is set, then deactivate_stale_dags_interval is
# ignored.
cleanup_interval = 60

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300

Expand Down
42 changes: 39 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from typing import Any, NamedTuple, cast

from setproctitle import setproctitle
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from tabulate import tabulate

Expand All @@ -48,6 +49,7 @@
from airflow.models import errors
from airflow.models.dag import DagModel
from airflow.models.dagwarning import DagWarning
from airflow.models.dataset import DagScheduleDatasetReference, DatasetModel, TaskOutletDatasetReference
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.serialized_dag import SerializedDagModel
from airflow.stats import Stats
Expand Down Expand Up @@ -433,8 +435,10 @@ def __init__(
self.last_stat_print_time = 0
# Last time we cleaned up DAGs which are no longer in files
self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
# How often to check for DAGs which are no longer in files
self.deactivate_stale_dags_interval = conf.getint("scheduler", "deactivate_stale_dags_interval")
# How often to clean up:
# * DAGs which are no longer in files
# * datasets that are no longer referenced by any DAG schedule parameters or task outlets
self.cleanup_interval = conf.getint("scheduler", "cleanup_interval")
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout
# How often to scan the DAGs directory for new files. Default to 5 minutes.
Expand Down Expand Up @@ -488,6 +492,36 @@ def start(self):

return self._run_parsing_loop()

@provide_session
def _orphan_unreferenced_datasets(self, session=None):
"""
Detects datasets that are no longer referenced in any DAG schedule parameters or task outlets and
sets the dataset is_orphaned flags to True
"""
orphaned_dataset_query = (
session.query(DatasetModel)
.join(
DagScheduleDatasetReference,
DagScheduleDatasetReference.dataset_id == DatasetModel.id,
isouter=True,
)
.join(
TaskOutletDatasetReference,
TaskOutletDatasetReference.dataset_id == DatasetModel.id,
isouter=True,
)
.group_by(DatasetModel.id)
.having(
and_(
func.count(DagScheduleDatasetReference.dag_id) == 0,
func.count(TaskOutletDatasetReference.dag_id) == 0,
)
)
)
for dataset in orphaned_dataset_query.all():
self.log.info("Orphaning dataset '%s'", dataset.uri)
dataset.is_orphaned = True

@provide_session
def _deactivate_stale_dags(self, session=None):
"""
Expand All @@ -497,7 +531,7 @@ def _deactivate_stale_dags(self, session=None):
"""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
if elapsed_time_since_refresh > self.cleanup_interval:
last_parsed = {
fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
}
Expand Down Expand Up @@ -533,6 +567,8 @@ def _deactivate_stale_dags(self, session=None):
SerializedDagModel.remove_dag(dag_id)
self.log.info("Deleted DAG %s in serialized_dag table", dag_id)

self._orphan_unreferenced_datasets(session=session)

self.last_deactivate_stale_dags_time = timezone.utcnow()

def _run_parsing_loop(self):
Expand Down
11 changes: 10 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,8 +855,17 @@ def _run_scheduler_loop(self) -> None:
timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)

if self._standalone_dag_processor:
cleanup_interval = conf.getfloat("scheduler", "deactivate_stale_dags_interval", fallback=60.0)
if cleanup_interval:
warnings.warn(
"The 'scheduler.deactivate_stale_dags_interval' configuration option is deprecated. "
"Please use 'scheduler.cleanup_interval instead'.",
RemovedInAirflow3Warning,
stacklevel=2,
)
cleanup_interval = conf.getfloat("scheduler", "cleanup_interval", fallback=cleanup_interval)
timers.call_regular_interval(
conf.getfloat("scheduler", "deactivate_stale_dags_interval", fallback=60.0),
cleanup_interval,
self._cleanup_stale_dags,
)

Expand Down
25 changes: 0 additions & 25 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2871,31 +2871,6 @@ def bulk_write_to_db(
for obj in task_refs_stored - task_refs_needed:
session.delete(obj)

# Remove any orphaned datasets
orphaned_dataset_query = (
session.query(DatasetModel)
.join(
DagScheduleDatasetReference,
DagScheduleDatasetReference.dataset_id == DatasetModel.id,
isouter=True,
)
.join(
TaskOutletDatasetReference,
TaskOutletDatasetReference.dataset_id == DatasetModel.id,
isouter=True,
)
.group_by(DatasetModel.id)
.having(
and_(
func.count(DagScheduleDatasetReference.dag_id) == 0,
func.count(TaskOutletDatasetReference.dag_id) == 0,
)
)
)
for dataset in orphaned_dataset_query.all():
dataset.is_orphaned = True
session.add(dataset)

# Issue SQL/finish "Unit of Work", but let @provide_session commit (or if passed a session, let caller
# decide when to commit
session.flush()
Expand Down

0 comments on commit 67139d6

Please sign in to comment.