Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dataset update endpoint #29433

Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,30 @@
# under the License.
from __future__ import annotations

from datetime import timezone

from flask import request
from flask_login import current_user
from marshmallow import ValidationError
from sqlalchemy import func
from sqlalchemy.orm import Session, joinedload, subqueryload

from airflow import Dataset
Copy link
Member

@uranusjr uranusjr May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should import from airflow.datasets instead.

from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.endpoints.request_dict import get_json_request_dict
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters
from airflow.api_connexion.schemas.dataset_schema import (
DatasetCollection,
DatasetEventCollection,
dataset_change_schema,
dataset_collection_schema,
dataset_event_collection_schema,
dataset_event_schema,
dataset_schema,
)
from airflow.api_connexion.types import APIResponse
from airflow.datasets.manager import dataset_manager
from airflow.models.dataset import DatasetEvent, DatasetModel
from airflow.security import permissions
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -120,3 +130,39 @@ def get_dataset_events(
return dataset_event_collection_schema.dump(
DatasetEventCollection(dataset_events=events, total_entries=total_entries)
)


@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
@provide_session
def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
michaelmicheal marked this conversation as resolved.
Show resolved Hide resolved
"""Create an external dataset event. This endpoint is useful if you want to update a dataset and
trigger downstream DAG runs from external services.
"""
try:
json_body = dataset_change_schema.load(get_json_request_dict())
except ValidationError as err:
raise BadRequest(detail=str(err))
uri = json_body["dataset_uri"]
external_source = request.remote_addr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure that remote_addr is the right choice here. maybe we could record such information in the log table? but to me it would seem it might be more useful to let this be an arbitrary text field? although i suppose user can always supply information in the extra dict.... wdyt?

user_id = getattr(current_user, "id", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ID, I feel this should use the username. The ID from database should be considered kind of an implementation detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the id how a typical FK->PK relationship is defined?
it seems appropriate to add user_id and make a FK to users table. then one could always look up the username by joining?

Copy link
Member

@uranusjr uranusjr May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, this should probably just be a fk.


Edit: Using an fk has problems when a user is deleted though. We probably don’t want to lose the triggering history in the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there's a mechanism to set to null when user deleted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting this to a (non-fk) username would be much more useful than having a null IMO.

Copy link
Contributor

@dstandish dstandish May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't stand in way here, don't really mind, but if you'll humor me I'll think it through out loud...

Maybe i'm just hung up on the standard practice of using surrogate keys, normalization, etc but...

So, airflow provides a mechanism to deactivate users. Interestingly, on the user edit form, it even says don't delete users; it's a bad practice; just deactivate them.

image

Additionally a username can be changed. So I could take some action, change my username, and now you don't know that I took that action.

Additionally, you could delete a user, have a new user added with same username, but it'd be a different "user".

I think your point that having a username is more useful than having a null is obviously true. But I guess my thought is that, it's not a situation that should happen, because users should not be deleted (because of course by keeping them we don't run into these problems), and if an airflow cluster admin does that well, that's up to them. But presuming they don't you have the benefits of referential integrity.

Interestingly the log table does not have a user_id column, which is a bit weird... probably should... but then there too i'd say it would make sense to record the db ID.

Another option would be to reject the deletion of a user when there are associated log or dataset events. That would seem reasonable too.

So yeah I think i've convinced myself a bit more that using the ID is the right way. I think that the mutability of username is a strong argument in light of security / auditing concerns. But lmkwyt

Copy link
Contributor

@dstandish dstandish May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the other reason @uranusjr would be so that we can use standard ORM features such as relationships to get from user to dataset and vice versa but... i suppose you could say that you could still do so with username via custom join conditions 🤷

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brought this up on slack. the suggestion is to not add user column at all right now since user handling will change with AIP-56. and for auditing purposes you can add a record to the Log table.

timestamp = json_body.get("timestamp").astimezone(timezone.utc)
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
extra = json_body["extra"]
dataset_event = dataset_manager.register_external_dataset_change(
dataset=Dataset(uri),
external_source=external_source,
user_id=user_id,
timestamp=timestamp,
extra=extra,
session=session,
)

if dataset_event:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wish it threw a helpful exception if there is no dataset instead of using None to signal that. though i don't think there's anything we can do about that now, can we @uranusjr ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait... this is a new method. so we could. but wait do we even need a new method?

Copy link
Contributor

@dstandish dstandish May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelmicheal why do we need a new method for this? could we not add params to register_dataset_change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the params are kwargs-only, i reckon we could make task_instance optional.

and, thankfully, since it accepts **kwargs, adding more params at call site in airflow won't break anything for "old" custom dataset managers

event_json = dataset_event_schema.dump(dataset_event)
# removing created_dagruns, since they will be created asynchronously in the scheduler
event_json.pop("created_dagruns")
return event_json

raise BadRequest(
"Dataset not found",
detail=f"The Dataset with uri: `{uri}` was not found",
)
42 changes: 42 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1921,6 +1921,32 @@ paths:
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'
post:
summary: Post dataset event
description: Post dataset event
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: post_dataset_event
tags: [Dataset]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ExternalDatasetChange'
responses:
'200':
description: Success.
content:
application/json:
schema:
$ref: '#/components/schemas/DatasetEvent'
'401':
$ref: '#/components/responses/Unauthenticated'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'


/config:
get:
Expand Down Expand Up @@ -3980,6 +4006,22 @@ components:
items:
$ref: '#/components/schemas/DatasetEvent'
- $ref: '#/components/schemas/CollectionInfo'
ExternalDatasetChange:
description: |
A external dataset change that should create a DatasetEvent

*New in version 2.6.0*
type: object
properties:
dataset_uri:
type: string
description: The URI of the dataset
nullable: false
extra:
type: object
description: The dataset event extra
nullable: true



# Configuration
Expand Down
11 changes: 11 additions & 0 deletions airflow/api_connexion/schemas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ class Meta:
data_interval_end = auto_field(dump_only=True)


class ExternalDatasetChangeSchema(Schema):
"""External Dataset change/update Schema"""

dataset_uri = fields.String()
timestamp = fields.DateTime()
extra = JsonObjectField()


class DatasetEventSchema(SQLAlchemySchema):
"""Dataset Event DB schema."""

Expand All @@ -128,6 +136,8 @@ class Meta:
source_dag_id = auto_field()
source_run_id = auto_field()
source_map_index = auto_field()
external_source = fields.String(dump_only=True)
user_id = auto_field()
created_dagruns = fields.List(fields.Nested(BasicDAGRunSchema))
timestamp = auto_field()

Expand All @@ -146,5 +156,6 @@ class DatasetEventCollectionSchema(Schema):
total_entries = fields.Int()


dataset_change_schema = ExternalDatasetChangeSchema()
dataset_event_schema = DatasetEventSchema()
dataset_event_collection_schema = DatasetEventCollectionSchema()
79 changes: 65 additions & 14 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING

from sqlalchemy import exc
Expand Down Expand Up @@ -44,28 +45,78 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)

def register_dataset_change(
self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs
) -> None:
self,
*,
dataset: Dataset,
task_instance: TaskInstance,
extra=None,
session: Session,
**kwargs,
) -> DatasetEvent | None:
"""
Register dataset related changes.
Register dataset related changes from a task instance.

For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
the dataset event
"""
dataset_model = self._get_dataset_model(dataset=dataset, session=session)
if dataset_model is None:
return None

dataset_event = DatasetEvent(
dataset_id=dataset_model.id,
source_task_id=task_instance.task_id,
source_dag_id=task_instance.dag_id,
source_run_id=task_instance.run_id,
source_map_index=task_instance.map_index,
extra=extra,
)

self._save_dataset_event(dataset_event, dataset_model, session)

return dataset_event

def register_external_dataset_change(
self,
dataset: Dataset,
timestamp: datetime,
session: Session,
external_source: str | None,
user_id: int | None,
extra=None,
**kwargs,
) -> DatasetEvent | None:
"""
Register a dataset change from an external source (rather than task_instance)

For local datasets, look them up, record the dataset event, and queue dagruns.
"""
dataset_model = self._get_dataset_model(dataset=dataset, session=session)
if dataset_model is None:
return None

# When an external dataset change is made through the API, it isn't triggered by a task instance,
# so we create a DatasetEvent without the task and dag data.
dataset_event = DatasetEvent(
dataset_id=dataset_model.id,
external_source=external_source,
user_id=user_id,
timestamp=timestamp,
extra=extra,
)

self._save_dataset_event(dataset_event, dataset_model, session)
return dataset_event

def _get_dataset_model(self, dataset: Dataset, session: Session) -> DatasetModel | None:
dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
if not dataset_model:
self.log.warning("DatasetModel %s not found", dataset)
return
session.add(
DatasetEvent(
dataset_id=dataset_model.id,
source_task_id=task_instance.task_id,
source_dag_id=task_instance.dag_id,
source_run_id=task_instance.run_id,
source_map_index=task_instance.map_index,
extra=extra,
)
)
return None
return dataset_model

def _save_dataset_event(self, dataset_event: DatasetEvent, dataset_model: DatasetModel, session: Session):
session.add(dataset_event)
session.flush()
Stats.incr("dataset.updates")
if dataset_model.consuming_dags:
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ class DatasetEvent(Base):
source_dag_id = Column(StringID(), nullable=True)
source_run_id = Column(StringID(), nullable=True)
source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
external_source = Column(StringID(), nullable=True)
user_id = Column(StringID(), nullable=True)
timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)

__tablename__ = "dataset_event"
Expand Down
6 changes: 6 additions & 0 deletions airflow/www/static/js/components/Table/Cells.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ export const TaskInstanceLink = ({ cell: { value, row } }: CellProps) => {
const { sourceRunId, sourceDagId, sourceMapIndex } = row.original;
const gridUrl = getMetaValue("grid_url");
const dagId = getMetaValue("dag_id");

if (!value || !sourceRunId || !sourceDagId || !gridUrl) {
return null;
}

const stringToReplace = dagId || "__DAG_ID__";
const url = `${gridUrl?.replace(
stringToReplace,
Expand All @@ -156,6 +161,7 @@ export const TaskInstanceLink = ({ cell: { value, row } }: CellProps) => {
value
)}`;
const mapIndex = sourceMapIndex > -1 ? `[${sourceMapIndex}]` : "";

return (
<Box>
<Link
Expand Down
64 changes: 64 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ export interface paths {
"/datasets/events": {
/** Get dataset events */
get: operations["get_dataset_events"];
/** Post dataset event */
post: operations["post_dataset_event"];
parameters: {
query: {
/** The numbers of items to return. */
Expand Down Expand Up @@ -1701,6 +1703,17 @@ export interface components {
DatasetEventCollection: {
dataset_events?: components["schemas"]["DatasetEvent"][];
} & components["schemas"]["CollectionInfo"];
/**
* @description A external dataset change that should create a DatasetEvent
*
* *New in version 2.6.0*
*/
ExternalDatasetChange: {
/** @description The URI of the dataset */
dataset_uri?: string;
/** @description The dataset event extra */
extra?: { [key: string]: unknown } | null;
};
/** @description The option of configuration. */
ConfigOption: {
key?: string;
Expand Down Expand Up @@ -4119,6 +4132,50 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
/** Post dataset event */
post_dataset_event: {
parameters: {
query: {
/** The numbers of items to return. */
limit?: components["parameters"]["PageLimit"];
/** The number of items to skip before starting to collect the result set. */
offset?: components["parameters"]["PageOffset"];
/**
* The name of the field to order the results by.
* Prefix a field name with `-` to reverse the sort order.
*
* *New in version 2.1.0*
*/
order_by?: components["parameters"]["OrderBy"];
/** The Dataset ID that updated the dataset. */
dataset_id?: components["parameters"]["FilterDatasetID"];
/** The DAG ID that updated the dataset. */
source_dag_id?: components["parameters"]["FilterSourceDAGID"];
/** The task ID that updated the dataset. */
source_task_id?: components["parameters"]["FilterSourceTaskID"];
/** The DAG run ID that updated the dataset. */
source_run_id?: components["parameters"]["FilterSourceRunID"];
/** The map index that updated the dataset. */
source_map_index?: components["parameters"]["FilterSourceMapIndex"];
};
};
responses: {
/** Success. */
200: {
content: {
"application/json": components["schemas"]["DatasetEvent"];
};
};
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
requestBody: {
content: {
"application/json": components["schemas"]["ExternalDatasetChange"];
};
};
};
get_config: {
responses: {
/** Success. */
Expand Down Expand Up @@ -4661,6 +4718,9 @@ export type BasicDAGRun = CamelCasedPropertiesDeep<
export type DatasetEventCollection = CamelCasedPropertiesDeep<
components["schemas"]["DatasetEventCollection"]
>;
export type ExternalDatasetChange = CamelCasedPropertiesDeep<
components["schemas"]["ExternalDatasetChange"]
>;
export type ConfigOption = CamelCasedPropertiesDeep<
components["schemas"]["ConfigOption"]
>;
Expand Down Expand Up @@ -4933,6 +4993,10 @@ export type GetDatasetVariables = CamelCasedPropertiesDeep<
export type GetDatasetEventsVariables = CamelCasedPropertiesDeep<
operations["get_dataset_events"]["parameters"]["query"]
>;
export type PostDatasetEventVariables = CamelCasedPropertiesDeep<
operations["post_dataset_event"]["parameters"]["query"] &
operations["post_dataset_event"]["requestBody"]["content"]["application/json"]
>;
export type GetPluginsVariables = CamelCasedPropertiesDeep<
operations["get_plugins"]["parameters"]["query"]
>;
Expand Down