Skip to content

Commit

Permalink
feat(api_connextion): add endpoint to delete DatasetDagRunQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Feb 5, 2024
1 parent 8fb4c71 commit c904030
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 3 deletions.
32 changes: 30 additions & 2 deletions airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@
# under the License.
from __future__ import annotations

from http import HTTPStatus
from typing import TYPE_CHECKING

from connexion import NoContent
from sqlalchemy import func, select
from sqlalchemy.orm import joinedload, subqueryload

from airflow.api_connexion import security
from airflow.api_connexion.endpoints.request_dict import get_json_request_dict
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_datetime, format_parameters
from airflow.api_connexion.schemas.dataset_schema import (
DatasetCollection,
DatasetEventCollection,
dataset_collection_schema,
dataset_event_collection_schema,
dataset_schema,
)
from airflow.models.dataset import DatasetEvent, DatasetModel
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session

Expand Down Expand Up @@ -124,3 +127,28 @@ def get_dataset_events(
return dataset_event_collection_schema.dump(
DatasetEventCollection(dataset_events=events, total_entries=total_entries)
)


@security.requires_access_dag("DELETE")
@provide_session
def delete_dag_dataset_run_queue(
*, dag_id: str, dataset_id: int, session: Session = NEW_SESSION
) -> APIResponse:
request_dict = get_json_request_dict()
cutoff_time = request_dict.get("cutoff_time", None)

where_clause_conditions = [
DatasetDagRunQueue.target_dag_id == dag_id,
DatasetDagRunQueue.dataset_id == dataset_id,
]
if cutoff_time is not None:
where_clause_conditions.append(DatasetDagRunQueue.created_at < format_datetime(cutoff_time))

ddrq = session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions))
if ddrq is None:
raise NotFound(
"DatasetDagRunQueue not found",
detail=f"The DatasetDagRunQueue with dag_id: `{dag_id}` and dataset_id: `{dataset_id}` was not found",
)
session.delete(ddrq)
return NoContent, HTTPStatus.NO_CONTENT
50 changes: 50 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,39 @@ paths:
"404":
$ref: "#/components/responses/NotFound"

/dags/{dag_id}/datasets/{dataset_id}/datasetDagRunQueue:
parameters:
- $ref: "#/components/parameters/DAGID"
- $ref: "#/components/parameters/DatasetID"
delete:
summary: Delete DatasetDagRunQueue.
description: |
Delete DatasetDagRunQueue
*New in version 2.9.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: delete_dag_dataset_run_queue
tags: [Dataset]
requestBody:
description: Parameters of delete DatasetDagRunQueue.
required: false
content:
application/json:
schema:
$ref: "#/components/schemas/DeleteDatasetDagRunQueue"

responses:
"204":
description: Success.
"400":
$ref: "#/components/responses/BadRequest"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"

/eventLogs:
get:
summary: List log entries
Expand Down Expand Up @@ -3014,6 +3047,15 @@ components:
description: Custom notes left by users for this Dag Run.
type: string

DeleteDatasetDagRunQueue:
type: object
properties:
cutoff_time:
description: The cutoff time of DatasetDagQueue.
format: date-time
type: string
nullable: true

EventLog:
type: object
description: Log of user operations via CLI or Web UI.
Expand Down Expand Up @@ -4961,6 +5003,14 @@ components:
required: true
description: The variable Key.

DatasetID:
in: path
name: dataset_id
schema:
type: integer
required: true
description: The Dataset ID.

# Logs
FullContent:
in: query
Expand Down
61 changes: 61 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ export interface paths {
};
};
};
"/dags/{dag_id}/datasets/{dataset_id}/datasetDagRunQueue": {
/**
* Delete DatasetDagRunQueue
*
* *New in version 2.9.0*
*/
delete: operations["delete_dag_dataset_run_queue"];
parameters: {
path: {
/** The DAG ID. */
dag_id: components["parameters"]["DAGID"];
/** The Dataset ID. */
dataset_id: components["parameters"]["DatasetID"];
};
};
};
"/eventLogs": {
/** List log entries from event log. */
get: operations["get_event_logs"];
Expand Down Expand Up @@ -1148,6 +1164,13 @@ export interface components {
/** @description Custom notes left by users for this Dag Run. */
note?: string;
};
DeleteDatasetDagRunQueue: {
/**
* Format: date-time
* @description The cutoff time of DatasetDagQueue.
*/
cutoff_time?: string | null;
};
/** @description Log of user operations via CLI or Web UI. */
EventLog: {
/** @description The event log ID */
Expand Down Expand Up @@ -2343,6 +2366,8 @@ export interface components {
PoolName: string;
/** @description The variable Key. */
VariableKey: string;
/** @description The Dataset ID. */
DatasetID: number;
/**
* @description A full content will be returned.
* By default, only the first fragment will be returned.
Expand Down Expand Up @@ -3239,6 +3264,35 @@ export interface operations {
};
};
};
/**
* Delete DatasetDagRunQueue
*
* *New in version 2.9.0*
*/
delete_dag_dataset_run_queue: {
parameters: {
path: {
/** The DAG ID. */
dag_id: components["parameters"]["DAGID"];
/** The Dataset ID. */
dataset_id: components["parameters"]["DatasetID"];
};
};
responses: {
/** Success. */
204: never;
400: components["responses"]["BadRequest"];
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
/** Parameters of delete DatasetDagRunQueue. */
requestBody: {
content: {
"application/json": components["schemas"]["DeleteDatasetDagRunQueue"];
};
};
};
/** List log entries from event log. */
get_event_logs: {
parameters: {
Expand Down Expand Up @@ -4781,6 +4835,9 @@ export type DagWarningCollection = CamelCasedPropertiesDeep<
export type SetDagRunNote = CamelCasedPropertiesDeep<
components["schemas"]["SetDagRunNote"]
>;
export type DeleteDatasetDagRunQueue = CamelCasedPropertiesDeep<
components["schemas"]["DeleteDatasetDagRunQueue"]
>;
export type EventLog = CamelCasedPropertiesDeep<
components["schemas"]["EventLog"]
>;
Expand Down Expand Up @@ -5073,6 +5130,10 @@ export type SetDagRunNoteVariables = CamelCasedPropertiesDeep<
operations["set_dag_run_note"]["parameters"]["path"] &
operations["set_dag_run_note"]["requestBody"]["content"]["application/json"]
>;
export type DeleteDagDatasetRunQueueVariables = CamelCasedPropertiesDeep<
operations["delete_dag_dataset_run_queue"]["parameters"]["path"] &
operations["delete_dag_dataset_run_queue"]["requestBody"]["content"]["application/json"]
>;
export type GetEventLogsVariables = CamelCasedPropertiesDeep<
operations["get_event_logs"]["parameters"]["query"]
>;
Expand Down
64 changes: 63 additions & 1 deletion tests/api_connexion/endpoints/test_dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DatasetEvent, DatasetModel
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import provide_session
Expand All @@ -47,9 +47,18 @@ def configured_app(minimal_app_for_api):
],
)
create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore
create_user(
app, # type: ignore
username="test_ddrq",
role_name="TestDDRQ",
permissions=[
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
],
)

yield app

delete_user(app, username="test_ddrq") # type: ignore
delete_user(app, username="test") # type: ignore
delete_user(app, username="test_no_permissions") # type: ignore

Expand Down Expand Up @@ -599,3 +608,56 @@ def test_should_return_conf_max_if_req_max_above_conf(self, session):

assert response.status_code == 200
assert len(response.json["dataset_events"]) == 150


class TestDeleteDatasetDagRunQueue(TestDatasetEndpoint):
def test_delete_should_respond_204(self, session, create_dummy_dag):
dag, _ = create_dummy_dag()
dag_id = dag.dag_id
dataset_id = self._create_dataset(session).id

ddrq = DatasetDagRunQueue(target_dag_id=dag_id, dataset_id=dataset_id)
session.add(ddrq)
session.commit()
conn = session.query(DatasetDagRunQueue).all()
assert len(conn) == 1

response = self.client.delete(
f"/api/v1/dags/{dag_id}/datasets/{dataset_id}/datasetDagRunQueue",
json={"cutoff_time": None},
environ_overrides={"REMOTE_USER": "test_ddrq"},
)

assert response.status_code == 204
conn = session.query(DatasetDagRunQueue).all()
assert len(conn) == 0

def test_should_respond_404(self):
response = self.client.delete(
"/api/v1/dags/not_exists/datasets/1/datasetDagRunQueue",
json={"cutoff_time": None},
environ_overrides={"REMOTE_USER": "test_ddrq"},
)
assert response.status_code == 404
assert {
"detail": "The DatasetDagRunQueue with dag_id: `not_exists` and dataset_id: `1` was not found",
"status": 404,
"title": "DatasetDagRunQueue not found",
"type": EXCEPTIONS_LINK_MAP[404],
} == response.json

def test_should_raises_401_unauthenticated(self, session):
self._create_dataset(session)
response = self.client.delete(
"/api/v1/dags/not_exists/datasets/1/datasetDagRunQueue", json={"cutoff_time": None}
)
assert_401(response)

def test_should_raise_403_forbidden(self, session):
self._create_dataset(session)
response = self.client.delete(
"/api/v1/dags/not_exists/datasets/1/datasetDagRunQueue",
json={"cutoff_time": None},
environ_overrides={"REMOTE_USER": "test_no_permissions"},
)
assert response.status_code == 403

0 comments on commit c904030

Please sign in to comment.