From 1fd90515077162041c7752910c2ebfaed66e5efd Mon Sep 17 00:00:00 2001 From: Michael Kotliar Date: Tue, 23 Jul 2019 17:01:24 -0400 Subject: [PATCH] Make API work faster --- cwl_airflow/wes/backend.py | 42 ++++++++----------- .../wes/openapi/swagger_configuration.yaml | 4 ++ 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/cwl_airflow/wes/backend.py b/cwl_airflow/wes/backend.py index 4ffdf5d..efccec4 100644 --- a/cwl_airflow/wes/backend.py +++ b/cwl_airflow/wes/backend.py @@ -1,6 +1,8 @@ import os import subprocess import logging +from airflow.models import DagBag, TaskInstance, DagRun +from airflow.utils.timezone import parse as parsedate logger = logging.getLogger(__name__) @@ -61,6 +63,7 @@ def get_dag_runs(self, dag_id=None, run_id=None, execution_date=None, state=None response_item = {"dag_id": d_id, "run_id": dag_run["run_id"], "execution_date": dag_run["execution_date"], + "start_date": dag_run["start_date"], "state": dag_run["state"], "tasks": []} for t_id in task_ids: @@ -74,40 +77,29 @@ def get_dag_runs(self, dag_id=None, run_id=None, execution_date=None, state=None def list_dags(self): logger.debug(f"""List all dags""") - list_dags = subprocess.run(["airflow", "list_dags"], capture_output=True, text=True, env=self.env) - list_dags.check_returncode() - dags_raw = list_dags.stdout.split("\n") - return [i.strip() for i in dags_raw[dags_raw.index("DAGS")+2:] if i.strip()] + return DagBag().dags.keys() def list_tasks(self, dag_id): logger.debug(f"""List tasks of {dag_id}""") - list_tasks = subprocess.run(["airflow", "list_tasks", dag_id], capture_output=True, text=True, env=self.env) - list_tasks.check_returncode() - tasks_raw = list_tasks.stdout.split("\n") - return [i.strip() for i in tasks_raw if i.strip()] + return [t.task_id for t in DagBag().dags[dag_id].tasks] def task_state(self, dag_id, task_id, execution_date): logger.debug(f"""Get {task_id} state of {dag_id} with execution date {execution_date}""") - task_state = subprocess.run(["airflow", "task_state", dag_id, task_id, execution_date], capture_output=True, text=True, env=self.env) - task_state.check_returncode() - return task_state.stdout.strip() + task_state = TaskInstance(DagBag().dags[dag_id].get_task(task_id=task_id), parsedate(execution_date)).current_state() + task_state = task_state if task_state else "none" + return task_state def list_dag_runs(self, dag_id, state): logger.debug(f"""List dag runs of {dag_id} with state {state}""") - list_dag_runs_cmd = ["airflow", "list_dag_runs", dag_id] - if state: - list_dag_runs_cmd.extend(["--state", state]) - list_dag_runs = subprocess.run(list_dag_runs_cmd, capture_output=True, text=True, env=self.env) - list_dag_runs.check_returncode() - dag_runs_raw = list_dag_runs.stdout.split("\n") - return [ - { - "run_id": i.split("|")[1].strip(), - "state": i.split("|")[2].strip(), - "execution_date": i.split("|")[3].strip() - } - for i in dag_runs_raw[dag_runs_raw.index("DAG RUNS")+3:] if i.strip() - ] + dag_runs = [] + for dag_run in DagRun.find(dag_id=dag_id, state=state): + dag_runs.append({ + 'run_id': dag_run.run_id, + 'state': dag_run.state, + 'execution_date': dag_run.execution_date.isoformat(), + 'start_date': ((dag_run.start_date or '') and dag_run.start_date.isoformat()) + }) + return dag_runs \ No newline at end of file diff --git a/cwl_airflow/wes/openapi/swagger_configuration.yaml b/cwl_airflow/wes/openapi/swagger_configuration.yaml index 71cba66..8e2dfca 100644 --- a/cwl_airflow/wes/openapi/swagger_configuration.yaml +++ b/cwl_airflow/wes/openapi/swagger_configuration.yaml @@ -137,6 +137,7 @@ definitions: - dag_id - run_id - execution_date + - start_date - state - tasks properties: @@ -147,6 +148,9 @@ definitions: execution_date: type: string format: date-time + start_date: + type: string + format: date-time state: $ref: '#/definitions/DagRunState' tasks: