Skip to content

Commit

Permalink
Add new access condition for content uploads
Browse files Browse the repository at this point in the history
fixes: pulp#2796
  • Loading branch information
gerrod3 committed Jun 2, 2022
1 parent b8c918e commit b50c624
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES/plugin_api/2796.feature
@@ -0,0 +1,2 @@
Added new access condition ``has_required_repo_perms_on_upload`` for RBAC plugins to use to require
users to specify a repository when uploading content.
41 changes: 41 additions & 0 deletions pulpcore/app/global_access_conditions.py
Expand Up @@ -398,6 +398,47 @@ def has_repo_or_repo_ver_param_model_or_obj_perms(request, view, action, permiss
return True


def has_required_repo_perms_on_upload(request, view, action, permission):
"""
Checks if the current user has permission to upload content to the ``repository`` object.
Since content queryset scoping prevents users from seeing orphaned content by default this
also checks to make sure that any user that isn't an admin has also supplied the ``repository``
parameter when performing a content upload.
This is usable as a conditional check in an AccessPolicy for content uploads. Here is an
example checking for the "file.view_filerepository" permission.
::
{
...
"condition": "has_required_repo_perms_on_upload:file.view_filerepository",
}
Args:
request (rest_framework.request.Request): The request being made.
view (subclass rest_framework.viewsets.GenericViewSet): The view being checked for
authorization.
action (str): The action being performed, e.g. "destroy".
permission (str): The name of the Permission to be checked. In the form
`app_label.codename`, e.g. "core.delete_task".
Returns:
True if the user has supplied the ``repository`` parameter and has the Permission on it
named by the ``permission`` argument, or is an admin. False otherwise.
"""
if request.user.is_superuser:
return True
serializer = view.serializer_class(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
if repository := serializer.validated_data.get("repository"):
if has_model_perms(request, view, action, permission):
return True
return request.user.has_perm(permission, repository)
return False


def has_publication_param_model_or_obj_perms(request, view, action, permission):
"""
Checks if the current user has object-level permission on the ``publication`` object.
Expand Down

0 comments on commit b50c624

Please sign in to comment.