Skip to content

Commit

Permalink
Add post endpoint for dataset events (#37570)
Browse files Browse the repository at this point in the history
Co-authored-by: Wei Lee <weilee.rx@gmail.com>
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
  • Loading branch information
3 people committed Feb 26, 2024
1 parent c6ba13a commit 9b17ff3
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 47 deletions.
49 changes: 48 additions & 1 deletion airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
from typing import TYPE_CHECKING

from connexion import NoContent
from marshmallow import ValidationError
from sqlalchemy import delete, func, select
from sqlalchemy.orm import joinedload, subqueryload

from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.endpoints.request_dict import get_json_request_dict
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_datetime, format_parameters
from airflow.api_connexion.schemas.dataset_schema import (
DagScheduleDatasetReference,
Expand All @@ -33,22 +35,32 @@
QueuedEvent,
QueuedEventCollection,
TaskOutletDatasetReference,
create_dataset_event_schema,
dataset_collection_schema,
dataset_event_collection_schema,
dataset_event_schema,
dataset_schema,
queued_event_collection_schema,
queued_event_schema,
)
from airflow.datasets import Dataset
from airflow.datasets.manager import dataset_manager
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.db import get_query_count
from airflow.utils.log.action_logger import action_event_from_permission
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.decorators import action_logging
from airflow.www.extensions.init_auth_manager import get_auth_manager

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.api_connexion.types import APIResponse

RESOURCE_EVENT_PREFIX = "dataset"


@security.requires_access_dataset("GET")
@provide_session
Expand Down Expand Up @@ -311,3 +323,38 @@ def delete_dataset_queued_events(
"Queue event not found",
detail=f"Queue event with dataset uri: `{uri}` was not found",
)


@security.requires_access_dataset("POST")
@provide_session
@action_logging(
event=action_event_from_permission(
prefix=RESOURCE_EVENT_PREFIX,
permission=permissions.ACTION_CAN_CREATE,
),
)
def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
"""Create dataset event."""
body = get_json_request_dict()
try:
json_body = create_dataset_event_schema.load(body)
except ValidationError as err:
raise BadRequest(detail=str(err))

uri = json_body["dataset_uri"]
dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri).limit(1))
if not dataset:
raise NotFound(title="Dataset not found", detail=f"Dataset with uri: '{uri}' not found")
timestamp = timezone.utcnow()
extra = json_body.get("extra", {})
extra["from_rest_api"] = True
dataset_event = dataset_manager.register_dataset_change(
dataset=Dataset(uri),
timestamp=timestamp,
extra=extra,
session=session,
)
if not dataset_event:
raise NotFound(title="Dataset not found", detail=f"Dataset with uri: '{uri}' not found")
event = dataset_event_schema.dump(dataset_event)
return event
59 changes: 50 additions & 9 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2129,21 +2129,21 @@ paths:
$ref: "#/components/responses/NotFound"

/datasets/events:
parameters:
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- $ref: "#/components/parameters/OrderBy"
- $ref: "#/components/parameters/FilterDatasetID"
- $ref: "#/components/parameters/FilterSourceDAGID"
- $ref: "#/components/parameters/FilterSourceTaskID"
- $ref: "#/components/parameters/FilterSourceRunID"
- $ref: "#/components/parameters/FilterSourceMapIndex"
get:
summary: Get dataset events
description: Get dataset events
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: get_dataset_events
tags: [Dataset]
parameters:
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- $ref: "#/components/parameters/OrderBy"
- $ref: "#/components/parameters/FilterDatasetID"
- $ref: "#/components/parameters/FilterSourceDAGID"
- $ref: "#/components/parameters/FilterSourceTaskID"
- $ref: "#/components/parameters/FilterSourceRunID"
- $ref: "#/components/parameters/FilterSourceMapIndex"
responses:
"200":
description: Success.
Expand All @@ -2157,6 +2157,33 @@ paths:
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"
post:
summary: Create dataset event
description: Create dataset event
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: create_dataset_event
tags: [Dataset]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateDatasetEvent'
responses:
'200':
description: Success.
content:
application/json:
schema:
$ref: '#/components/schemas/DatasetEvent'
"400":
$ref: "#/components/responses/BadRequest"
'401':
$ref: '#/components/responses/Unauthenticated'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'

/config:
get:
Expand Down Expand Up @@ -4290,6 +4317,20 @@ components:
description: The dataset event creation time
nullable: false

CreateDatasetEvent:
type: object
required:
- dataset_uri
properties:
dataset_uri:
type: string
description: The URI of the dataset
nullable: false
extra:
type: object
description: The dataset event extra
nullable: true

QueuedEvent:
type: object
properties:
Expand Down
8 changes: 8 additions & 0 deletions airflow/api_connexion/schemas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,16 @@ class DatasetEventCollectionSchema(Schema):
total_entries = fields.Int()


class CreateDatasetEventSchema(Schema):
"""Create Dataset Event Schema."""

dataset_uri = fields.String()
extra = JsonObjectField()


dataset_event_schema = DatasetEventSchema()
dataset_event_collection_schema = DatasetEventCollectionSchema()
create_dataset_event_schema = CreateDatasetEventSchema()


class QueuedEvent(NamedTuple):
Expand Down
37 changes: 25 additions & 12 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,14 @@ def create_datasets(self, dataset_models: list[DatasetModel], session: Session)
self.notify_dataset_created(dataset=Dataset(uri=dataset_model.uri, extra=dataset_model.extra))

def register_dataset_change(
self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs
) -> None:
self,
*,
task_instance: TaskInstance | None = None,
dataset: Dataset,
extra=None,
session: Session,
**kwargs,
) -> DatasetEvent | None:
"""
Register dataset related changes.
Expand All @@ -71,17 +77,23 @@ def register_dataset_change(
)
if not dataset_model:
self.log.warning("DatasetModel %s not found", dataset)
return
session.add(
DatasetEvent(
dataset_id=dataset_model.id,
source_task_id=task_instance.task_id,
source_dag_id=task_instance.dag_id,
source_run_id=task_instance.run_id,
source_map_index=task_instance.map_index,
extra=extra,
return None

event_kwargs = {
"dataset_id": dataset_model.id,
"extra": extra,
}
if task_instance:
event_kwargs.update(
{
"source_task_id": task_instance.task_id,
"source_dag_id": task_instance.dag_id,
"source_run_id": task_instance.run_id,
"source_map_index": task_instance.map_index,
}
)
)
dataset_event = DatasetEvent(**event_kwargs)
session.add(dataset_event)
session.flush()

self.notify_dataset_changed(dataset=dataset)
Expand All @@ -90,6 +102,7 @@ def register_dataset_change(
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)
session.flush()
return dataset_event

def notify_dataset_created(self, dataset: Dataset):
"""Run applicable notification actions when a dataset is created."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
]
# [END security_user_perms]

Expand Down Expand Up @@ -275,6 +276,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_VARIABLE),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_XCOM),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DATASET),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
]
# [END security_op_perms]

Expand Down
3 changes: 3 additions & 0 deletions airflow/www/static/js/components/Table/Cells.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ export const TaskInstanceLink = ({ cell: { value, row } }: CellProps) => {
const { sourceRunId, sourceDagId, sourceMapIndex } = row.original;
const gridUrl = getMetaValue("grid_url");
const dagId = getMetaValue("dag_id");
if (!value || !sourceRunId || !sourceDagId || !gridUrl) {
return null;
}
const stringToReplace = dagId || "__DAG_ID__";
const url = `${gridUrl?.replace(
stringToReplace,
Expand Down
59 changes: 34 additions & 25 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,31 +684,8 @@ export interface paths {
"/datasets/events": {
/** Get dataset events */
get: operations["get_dataset_events"];
parameters: {
query: {
/** The numbers of items to return. */
limit?: components["parameters"]["PageLimit"];
/** The number of items to skip before starting to collect the result set. */
offset?: components["parameters"]["PageOffset"];
/**
* The name of the field to order the results by.
* Prefix a field name with `-` to reverse the sort order.
*
* *New in version 2.1.0*
*/
order_by?: components["parameters"]["OrderBy"];
/** The Dataset ID that updated the dataset. */
dataset_id?: components["parameters"]["FilterDatasetID"];
/** The DAG ID that updated the dataset. */
source_dag_id?: components["parameters"]["FilterSourceDAGID"];
/** The task ID that updated the dataset. */
source_task_id?: components["parameters"]["FilterSourceTaskID"];
/** The DAG run ID that updated the dataset. */
source_run_id?: components["parameters"]["FilterSourceRunID"];
/** The map index that updated the dataset. */
source_map_index?: components["parameters"]["FilterSourceMapIndex"];
};
};
/** Create dataset event */
post: operations["create_dataset_event"];
};
"/config": {
get: operations["get_config"];
Expand Down Expand Up @@ -1825,6 +1802,12 @@ export interface components {
/** @description The dataset event creation time */
timestamp?: string;
};
CreateDatasetEvent: {
/** @description The URI of the dataset */
dataset_uri: string;
/** @description The dataset event extra */
extra?: { [key: string]: unknown } | null;
};
QueuedEvent: {
/** @description The datata uri. */
uri?: string;
Expand Down Expand Up @@ -4598,6 +4581,26 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
/** Create dataset event */
create_dataset_event: {
responses: {
/** Success. */
200: {
content: {
"application/json": components["schemas"]["DatasetEvent"];
};
};
400: components["responses"]["BadRequest"];
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
requestBody: {
content: {
"application/json": components["schemas"]["CreateDatasetEvent"];
};
};
};
get_config: {
parameters: {
query: {
Expand Down Expand Up @@ -5167,6 +5170,9 @@ export type DatasetCollection = CamelCasedPropertiesDeep<
export type DatasetEvent = CamelCasedPropertiesDeep<
components["schemas"]["DatasetEvent"]
>;
export type CreateDatasetEvent = CamelCasedPropertiesDeep<
components["schemas"]["CreateDatasetEvent"]
>;
export type QueuedEvent = CamelCasedPropertiesDeep<
components["schemas"]["QueuedEvent"]
>;
Expand Down Expand Up @@ -5481,6 +5487,9 @@ export type GetDatasetVariables = CamelCasedPropertiesDeep<
export type GetDatasetEventsVariables = CamelCasedPropertiesDeep<
operations["get_dataset_events"]["parameters"]["query"]
>;
export type CreateDatasetEventVariables = CamelCasedPropertiesDeep<
operations["create_dataset_event"]["requestBody"]["content"]["application/json"]
>;
export type GetConfigVariables = CamelCasedPropertiesDeep<
operations["get_config"]["parameters"]["query"]
>;
Expand Down

0 comments on commit 9b17ff3

Please sign in to comment.