diff --git a/superset/views/datasource/views.py b/superset/views/datasource/views.py index cc3d6d202ebb..2b5ed892ff17 100644 --- a/superset/views/datasource/views.py +++ b/superset/views/datasource/views.py @@ -18,7 +18,7 @@ from collections import Counter from typing import Any -from flask import request +from flask import g, request from flask_appbuilder import expose from flask_appbuilder.api import rison from flask_appbuilder.security.decorators import has_access_api @@ -28,6 +28,7 @@ from sqlalchemy.orm.exc import NoResultFound from superset import app, db, event_logger +from superset.commands.utils import populate_owners from superset.connectors.connector_registry import ConnectorRegistry from superset.connectors.sqla.utils import get_physical_table_metadata from superset.datasets.commands.exceptions import ( @@ -35,6 +36,7 @@ DatasetNotFoundError, ) from superset.exceptions import SupersetException, SupersetSecurityException +from superset.extensions import security_manager from superset.models.core import Database from superset.typing import FlaskResponse from superset.views.base import ( @@ -79,16 +81,28 @@ def save(self) -> FlaskResponse: if "owners" in datasource_dict and orm_datasource.owner_class is not None: # Check ownership if app.config["OLD_API_CHECK_DATASET_OWNERSHIP"]: + # mimic the behavior of the new dataset command that + # checks ownership and ensures that non-admins aren't locked out + # of the object try: check_ownership(orm_datasource) except SupersetSecurityException as ex: raise DatasetForbiddenError() from ex - - datasource_dict["owners"] = ( - db.session.query(orm_datasource.owner_class) - .filter(orm_datasource.owner_class.id.in_(datasource_dict["owners"])) - .all() - ) + user = security_manager.get_user_by_id(g.user.id) + datasource_dict["owners"] = populate_owners( + user, datasource_dict["owners"], default_to_user=False + ) + else: + # legacy behavior + datasource_dict["owners"] = ( + db.session.query(orm_datasource.owner_class) + .filter( + orm_datasource.owner_class.id.in_( + datasource_dict["owners"] or [] + ) + ) + .all() + ) duplicates = [ name