Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add post endpoint for dataset events #37570

Merged
Merged
49 changes: 48 additions & 1 deletion airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
from typing import TYPE_CHECKING

from connexion import NoContent
from marshmallow import ValidationError
from sqlalchemy import delete, func, select
from sqlalchemy.orm import joinedload, subqueryload

from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.endpoints.request_dict import get_json_request_dict
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_datetime, format_parameters
from airflow.api_connexion.schemas.dataset_schema import (
DagScheduleDatasetReference,
Expand All @@ -33,22 +35,32 @@
QueuedEvent,
QueuedEventCollection,
TaskOutletDatasetReference,
create_dataset_event_schema,
dataset_collection_schema,
dataset_event_collection_schema,
dataset_event_schema,
dataset_schema,
queued_event_collection_schema,
queued_event_schema,
)
from airflow.datasets import Dataset
from airflow.datasets.manager import dataset_manager
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.db import get_query_count
from airflow.utils.log.action_logger import action_event_from_permission
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.decorators import action_logging
from airflow.www.extensions.init_auth_manager import get_auth_manager

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.api_connexion.types import APIResponse

RESOURCE_EVENT_PREFIX = "dataset"


@security.requires_access_dataset("GET")
@provide_session
Expand Down Expand Up @@ -311,3 +323,38 @@ def delete_dataset_queued_events(
"Queue event not found",
detail=f"Queue event with dataset uri: `{uri}` was not found",
)


@security.requires_access_dataset("POST")
@provide_session
@action_logging(
event=action_event_from_permission(
prefix=RESOURCE_EVENT_PREFIX,
permission=permissions.ACTION_CAN_CREATE,
),
)
def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
"""Create dataset event."""
body = get_json_request_dict()
try:
json_body = create_dataset_event_schema.load(body)
except ValidationError as err:
raise BadRequest(detail=str(err))

uri = json_body["dataset_uri"]
dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri).limit(1))
if not dataset:
raise NotFound(title="Dataset not found", detail=f"Dataset with uri: '{uri}' not found")
timestamp = timezone.utcnow()
extra = json_body.get("extra", {})
extra["from_rest_api"] = True
dataset_event = dataset_manager.register_dataset_change(
dataset=Dataset(uri),
timestamp=timestamp,
extra=extra,
session=session,
)
if not dataset_event:
raise NotFound(title="Dataset not found", detail=f"Dataset with uri: '{uri}' not found")
event = dataset_event_schema.dump(dataset_event)
return event
59 changes: 50 additions & 9 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2129,21 +2129,21 @@ paths:
$ref: "#/components/responses/NotFound"

/datasets/events:
parameters:
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- $ref: "#/components/parameters/OrderBy"
- $ref: "#/components/parameters/FilterDatasetID"
- $ref: "#/components/parameters/FilterSourceDAGID"
- $ref: "#/components/parameters/FilterSourceTaskID"
- $ref: "#/components/parameters/FilterSourceRunID"
- $ref: "#/components/parameters/FilterSourceMapIndex"
get:
summary: Get dataset events
description: Get dataset events
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: get_dataset_events
tags: [Dataset]
parameters:
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- $ref: "#/components/parameters/OrderBy"
- $ref: "#/components/parameters/FilterDatasetID"
- $ref: "#/components/parameters/FilterSourceDAGID"
- $ref: "#/components/parameters/FilterSourceTaskID"
- $ref: "#/components/parameters/FilterSourceRunID"
- $ref: "#/components/parameters/FilterSourceMapIndex"
responses:
"200":
description: Success.
Expand All @@ -2157,6 +2157,33 @@ paths:
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"
post:
summary: Create dataset event
description: Create dataset event
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: create_dataset_event
tags: [Dataset]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateDatasetEvent'
responses:
'200':
description: Success.
content:
application/json:
schema:
$ref: '#/components/schemas/DatasetEvent'
"400":
$ref: "#/components/responses/BadRequest"
'401':
$ref: '#/components/responses/Unauthenticated'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'

/config:
get:
Expand Down Expand Up @@ -4290,6 +4317,20 @@ components:
description: The dataset event creation time
nullable: false

CreateDatasetEvent:
type: object
required:
- dataset_uri
properties:
dataset_uri:
type: string
description: The URI of the dataset
nullable: false
extra:
type: object
description: The dataset event extra
nullable: true

QueuedEvent:
type: object
properties:
Expand Down
8 changes: 8 additions & 0 deletions airflow/api_connexion/schemas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,16 @@ class DatasetEventCollectionSchema(Schema):
total_entries = fields.Int()


class CreateDatasetEventSchema(Schema):
"""Create Dataset Event Schema."""

dataset_uri = fields.String()
extra = JsonObjectField()


dataset_event_schema = DatasetEventSchema()
dataset_event_collection_schema = DatasetEventCollectionSchema()
create_dataset_event_schema = CreateDatasetEventSchema()


class QueuedEvent(NamedTuple):
Expand Down
37 changes: 25 additions & 12 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,14 @@ def create_datasets(self, dataset_models: list[DatasetModel], session: Session)
self.notify_dataset_created(dataset=Dataset(uri=dataset_model.uri, extra=dataset_model.extra))

def register_dataset_change(
self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs
) -> None:
self,
*,
task_instance: TaskInstance | None = None,
dataset: Dataset,
extra=None,
session: Session,
**kwargs,
) -> DatasetEvent | None:
"""
Register dataset related changes.

Expand All @@ -71,17 +77,23 @@ def register_dataset_change(
)
if not dataset_model:
self.log.warning("DatasetModel %s not found", dataset)
return
session.add(
DatasetEvent(
dataset_id=dataset_model.id,
source_task_id=task_instance.task_id,
source_dag_id=task_instance.dag_id,
source_run_id=task_instance.run_id,
source_map_index=task_instance.map_index,
extra=extra,
return None

event_kwargs = {
"dataset_id": dataset_model.id,
"extra": extra,
}
if task_instance:
event_kwargs.update(
{
"source_task_id": task_instance.task_id,
"source_dag_id": task_instance.dag_id,
"source_run_id": task_instance.run_id,
"source_map_index": task_instance.map_index,
}
)
)
dataset_event = DatasetEvent(**event_kwargs)
session.add(dataset_event)
session.flush()

self.notify_dataset_changed(dataset=dataset)
Expand All @@ -90,6 +102,7 @@ def register_dataset_change(
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)
session.flush()
return dataset_event

def notify_dataset_created(self, dataset: Dataset):
"""Run applicable notification actions when a dataset is created."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
]
# [END security_user_perms]

Expand Down Expand Up @@ -275,6 +276,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_VARIABLE),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_XCOM),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DATASET),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
]
# [END security_op_perms]

Expand Down
3 changes: 3 additions & 0 deletions airflow/www/static/js/components/Table/Cells.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ export const TaskInstanceLink = ({ cell: { value, row } }: CellProps) => {
const { sourceRunId, sourceDagId, sourceMapIndex } = row.original;
const gridUrl = getMetaValue("grid_url");
const dagId = getMetaValue("dag_id");
if (!value || !sourceRunId || !sourceDagId || !gridUrl) {
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
const stringToReplace = dagId || "__DAG_ID__";
const url = `${gridUrl?.replace(
stringToReplace,
Expand Down
59 changes: 34 additions & 25 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,31 +684,8 @@ export interface paths {
"/datasets/events": {
/** Get dataset events */
get: operations["get_dataset_events"];
parameters: {
query: {
/** The numbers of items to return. */
limit?: components["parameters"]["PageLimit"];
/** The number of items to skip before starting to collect the result set. */
offset?: components["parameters"]["PageOffset"];
/**
* The name of the field to order the results by.
* Prefix a field name with `-` to reverse the sort order.
*
* *New in version 2.1.0*
*/
order_by?: components["parameters"]["OrderBy"];
/** The Dataset ID that updated the dataset. */
dataset_id?: components["parameters"]["FilterDatasetID"];
/** The DAG ID that updated the dataset. */
source_dag_id?: components["parameters"]["FilterSourceDAGID"];
/** The task ID that updated the dataset. */
source_task_id?: components["parameters"]["FilterSourceTaskID"];
/** The DAG run ID that updated the dataset. */
source_run_id?: components["parameters"]["FilterSourceRunID"];
/** The map index that updated the dataset. */
source_map_index?: components["parameters"]["FilterSourceMapIndex"];
};
};
/** Create dataset event */
post: operations["create_dataset_event"];
};
"/config": {
get: operations["get_config"];
Expand Down Expand Up @@ -1825,6 +1802,12 @@ export interface components {
/** @description The dataset event creation time */
timestamp?: string;
};
CreateDatasetEvent: {
/** @description The URI of the dataset */
dataset_uri: string;
/** @description The dataset event extra */
extra?: { [key: string]: unknown } | null;
};
QueuedEvent: {
/** @description The datata uri. */
uri?: string;
Expand Down Expand Up @@ -4598,6 +4581,26 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
/** Create dataset event */
create_dataset_event: {
responses: {
/** Success. */
200: {
content: {
"application/json": components["schemas"]["DatasetEvent"];
};
};
400: components["responses"]["BadRequest"];
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
requestBody: {
content: {
"application/json": components["schemas"]["CreateDatasetEvent"];
};
};
};
get_config: {
parameters: {
query: {
Expand Down Expand Up @@ -5167,6 +5170,9 @@ export type DatasetCollection = CamelCasedPropertiesDeep<
export type DatasetEvent = CamelCasedPropertiesDeep<
components["schemas"]["DatasetEvent"]
>;
export type CreateDatasetEvent = CamelCasedPropertiesDeep<
components["schemas"]["CreateDatasetEvent"]
>;
export type QueuedEvent = CamelCasedPropertiesDeep<
components["schemas"]["QueuedEvent"]
>;
Expand Down Expand Up @@ -5481,6 +5487,9 @@ export type GetDatasetVariables = CamelCasedPropertiesDeep<
export type GetDatasetEventsVariables = CamelCasedPropertiesDeep<
operations["get_dataset_events"]["parameters"]["query"]
>;
export type CreateDatasetEventVariables = CamelCasedPropertiesDeep<
operations["create_dataset_event"]["requestBody"]["content"]["application/json"]
>;
export type GetConfigVariables = CamelCasedPropertiesDeep<
operations["get_config"]["parameters"]["query"]
>;
Expand Down