Skip to content

Commit

Permalink
Update clean_dag_run.py to remove outdated DAGs
Browse files Browse the repository at this point in the history
Removes the DAGs for the previous versions of the
same workflows if they are not any running DagRuns
  • Loading branch information
michael-kotliar committed Nov 11, 2020
1 parent 9f4250e commit a2b9512
Showing 1 changed file with 40 additions and 2 deletions.
42 changes: 40 additions & 2 deletions cwl_airflow/extensions/dags/clean_dag_run.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import os
import logging
import psutil
import shutil

from datetime import datetime
from airflow import configuration
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.settings import DAGS_FOLDER
from airflow.utils.dag_processing import list_py_file_paths
from airflow.api.common.experimental import delete_dag

from cwl_airflow.utilities.report import dag_on_success, dag_on_failure
from cwl_airflow.utilities.helpers import load_yaml
from cwl_airflow.utilities.helpers import load_yaml, get_rootname


TIMEOUT = configuration.conf.getint("core", "KILLED_TASK_CLEANUP_TIME")
Expand Down Expand Up @@ -77,6 +82,38 @@ def remove_tmp_data(dr):
logging.error(f"""Failed to delete {tmp_folder}\n {ex}""")


def remove_outdated_dags(cwl_id):
logging.info(f"""Searching for dags based on cwl_id: {cwl_id}""")
dags = {}
for location in list_py_file_paths(DAGS_FOLDER, include_examples=False):
dag_id = get_rootname(location)
if cwl_id not in dag_id:
continue
dags[dag_id] = {
"location": location,
"modified": datetime.fromtimestamp(os.path.getmtime(location))
}
logging.info(f"""Found dag_id: {dag_id}, modified: {dags[dag_id]["modified"]}""")
for dag_id, dag_metadata in sorted(dags.items(), key=lambda i: i[1]["modified"])[:-1]:
logging.info(f"""Cleaning dag_id: {dag_id}""")
if len(DagRun.find(dag_id=dag_id, state=State.RUNNING)) == 0:
try:
delete_dag.delete_dag(dag_id)
except Exception as ex:
logging.error(f"""Failed to delete DAG\n {ex}""")
for f in [
dag_metadata["location"],
os.path.splitext(dag_metadata["location"])[0]+".cwl"
]:
try:
logging.info(f"""Deleting DAG file: {f}""")
os.remove(f)
except Exception as ex:
logging.error(f"""Failed to delete file {f}\n {ex}""")
else:
logging.info("Skipping, DAG has running DagRuns")


def clean_dag_run(**context):
dag_id = context["dag_run"].conf["remove_dag_id"]
run_id = context["dag_run"].conf["remove_run_id"]
Expand All @@ -85,7 +122,8 @@ def clean_dag_run(**context):
stop_tasks(dr)
remove_tmp_data(dr)
clean_db(dr)

remove_outdated_dags(dag_id.split("-")[0])


dag = DAG(dag_id="clean_dag_run",
start_date=days_ago(1),
Expand Down

0 comments on commit a2b9512

Please sign in to comment.