-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add dataset update endpoint #29433
Add dataset update endpoint #29433
Changes from all commits
d454c66
f46039c
b39ba05
e3cd062
cd04220
c5a4abd
1e3cfb0
f91304c
4c127fd
ee93342
61867f1
2265cdb
65a4c65
51b325f
cbb7f3a
62aa1ea
0573945
1428c10
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,20 +16,30 @@ | |
# under the License. | ||
from __future__ import annotations | ||
|
||
from datetime import datetime, timezone | ||
|
||
from flask import request | ||
from flask_login import current_user | ||
from marshmallow import ValidationError | ||
from sqlalchemy import func | ||
from sqlalchemy.orm import Session, joinedload, subqueryload | ||
|
||
from airflow import Dataset | ||
from airflow.api_connexion import security | ||
from airflow.api_connexion.exceptions import NotFound | ||
from airflow.api_connexion.endpoints.request_dict import get_json_request_dict | ||
from airflow.api_connexion.exceptions import BadRequest, NotFound | ||
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters | ||
from airflow.api_connexion.schemas.dataset_schema import ( | ||
DatasetCollection, | ||
DatasetEventCollection, | ||
dataset_change_schema, | ||
dataset_collection_schema, | ||
dataset_event_collection_schema, | ||
dataset_event_schema, | ||
dataset_schema, | ||
) | ||
from airflow.api_connexion.types import APIResponse | ||
from airflow.datasets.manager import dataset_manager | ||
from airflow.models.dataset import DatasetEvent, DatasetModel | ||
from airflow.security import permissions | ||
from airflow.utils.session import NEW_SESSION, provide_session | ||
|
@@ -120,3 +130,40 @@ def get_dataset_events( | |
return dataset_event_collection_schema.dump( | ||
DatasetEventCollection(dataset_events=events, total_entries=total_entries) | ||
) | ||
|
||
|
||
@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)]) | ||
@provide_session | ||
def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse: | ||
michaelmicheal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Create an external dataset event. This endpoint is useful if you want to update a dataset and | ||
trigger downstream DAG runs from external services. | ||
""" | ||
try: | ||
json_body = dataset_change_schema.load(get_json_request_dict()) | ||
except ValidationError as err: | ||
raise BadRequest(detail=str(err)) | ||
uri = json_body["dataset_uri"] | ||
external_source = request.remote_addr | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i am not sure that remote_addr is the right choice here. maybe we could record such information in the log table? but to me it would seem it might be more useful to let this be an arbitrary text field? although i suppose user can always supply information in the |
||
user_id = getattr(current_user, "id", None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of ID, I feel this should use the username. The ID from database should be considered kind of an implementation detail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't the id how a typical FK->PK relationship is defined? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah good point, this should probably just be a fk. Edit: Using an fk has problems when a user is deleted though. We probably don’t want to lose the triggering history in the case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think there's a mechanism to set to null when user deleted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting this to a (non-fk) username would be much more useful than having a null IMO. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't stand in way here, don't really mind, but if you'll humor me I'll think it through out loud... Maybe i'm just hung up on the standard practice of using surrogate keys, normalization, etc but... So, airflow provides a mechanism to deactivate users. Interestingly, on the user edit form, it even says Additionally a username can be changed. So I could take some action, change my username, and now you don't know that I took that action. Additionally, you could delete a user, have a new user added with same username, but it'd be a different "user". I think your point that having a username is more useful than having a null is obviously true. But I guess my thought is that, it's not a situation that should happen, because users should not be deleted (because of course by keeping them we don't run into these problems), and if an airflow cluster admin does that well, that's up to them. But presuming they don't you have the benefits of referential integrity. Interestingly the log table does not have a user_id column, which is a bit weird... probably should... but then there too i'd say it would make sense to record the db ID. Another option would be to reject the deletion of a user when there are associated log or dataset events. That would seem reasonable too. So yeah I think i've convinced myself a bit more that using the ID is the right way. I think that the mutability of username is a strong argument in light of security / auditing concerns. But lmkwyt There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess the other reason @uranusjr would be so that we can use standard ORM features such as relationships to get from user to dataset and vice versa but... i suppose you could say that you could still do so with username via custom join conditions 🤷 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. brought this up on slack. the suggestion is to not add user column at all right now since user handling will change with AIP-56. and for auditing purposes you can add a record to the Log table. |
||
timestamp = json_body.get("timestamp", datetime.now()) | ||
timestamp = timestamp.astimezone(timezone.utc) | ||
extra = json_body.get("extra", {}) | ||
dataset_event = dataset_manager.register_external_dataset_change( | ||
dataset=Dataset(uri), | ||
external_source=external_source, | ||
user_id=user_id, | ||
timestamp=timestamp, | ||
extra=extra, | ||
session=session, | ||
) | ||
|
||
if dataset_event: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i wish it threw a helpful exception if there is no dataset instead of using None to signal that. though i don't think there's anything we can do about that now, can we @uranusjr ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh wait... this is a new method. so we could. but wait do we even need a new method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @michaelmicheal why do we need a new method for this? could we not add params to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since the params are kwargs-only, i reckon we could make task_instance optional. and, thankfully, since it accepts |
||
event_json = dataset_event_schema.dump(dataset_event) | ||
# removing created_dagruns, since they will be created asynchronously in the scheduler | ||
event_json.pop("created_dagruns") | ||
return event_json | ||
|
||
raise BadRequest( | ||
"Dataset not found", | ||
detail=f"The Dataset with uri: `{uri}` was not found", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should import from
airflow.datasets
instead.