/
permissions.py
114 lines (95 loc) · 3.83 KB
/
permissions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from typing import List
from webargs import fields
from webargs.flaskparser import use_args
from flask import Blueprint
from werkzeug.exceptions import Unauthorized, NotFound, BadRequest, InternalServerError
from ..models import (
Permissions,
PermissionSchema,
PermissionListSchema,
CIDCRole,
IntegrityError,
NoResultFound,
IAMException,
)
from ..shared.auth import get_current_user, requires_auth
from ..shared.rest_utils import (
with_lookup,
marshal_response,
unmarshal_request,
delete_response,
)
permissions_bp = Blueprint("permissions", __name__)
permission_schema = PermissionSchema()
permission_list_schema = PermissionListSchema()
@permissions_bp.route("/", methods=["GET"])
@requires_auth("permissions")
@use_args({"user_id": fields.Int()}, location="query")
@marshal_response(permission_list_schema)
def list_permissions(args: dict):
"""
List all permissions for the current user, unless the `user_id` query param is provided.
If the `user_id` query param is provided and the current user is an admin, then list
all of the permissions granted for the user with id `user_id`.
"""
current_user = get_current_user()
user_id = args.get("user_id")
# Admins can look up permissions for any user, but
# non-admins can only look up their own permissions
if current_user.is_admin():
if user_id:
permissions = Permissions.find_for_user(user_id)
else:
permissions = Permissions.list()
else:
if user_id and current_user.id != user_id:
raise Unauthorized(
f"{current_user.email} cannot view permissions for other users"
)
permissions = Permissions.find_for_user(current_user.id)
# Since we aren't paginating, `permissions` is a list of all requested permissions
total = len(permissions)
return {"_items": permissions, "_meta": {"total": total}}
@permissions_bp.route("/<int:permission>", methods=["GET"])
@requires_auth("permissions_item")
@with_lookup(Permissions, "permission")
@marshal_response(permission_schema)
def get_permission(permission: Permissions) -> Permissions:
"""Look up the permission record with id `permission_id`."""
current_user = get_current_user()
# If the user isn't allowed to view this permission, respond with 404.
if not current_user.is_admin() and permission.granted_to_user != current_user.id:
raise NotFound()
# this is not user-input due to @with_lookup, so safe to return
return permission
@permissions_bp.route("/", methods=["POST"])
@requires_auth("permissions_item", allowed_roles=[CIDCRole.ADMIN.value])
@unmarshal_request(permission_schema, "permission")
@marshal_response(permission_schema, 201)
def create_permission(permission: Permissions) -> Permissions:
"""Create a new permission record."""
if permission.granted_by_user is None:
granter = get_current_user()
permission.granted_by_user = granter.id
try:
permission.insert()
except IntegrityError as e:
raise BadRequest(str(e.orig))
except IAMException as e:
# We return info on this internal error, since this is an admin-only endpoint
raise InternalServerError(str(e))
return permission
@permissions_bp.route("/<int:permission>", methods=["DELETE"])
@requires_auth("permissions_item", allowed_roles=[CIDCRole.ADMIN.value])
@with_lookup(Permissions, "permission", check_etag=True)
def delete_permission(permission: Permissions):
"""Delete a permission record."""
try:
deleter = get_current_user()
permission.delete(deleted_by=deleter)
except NoResultFound as e:
raise NotFound(str(e.orig))
except IAMException as e:
# We return info on this internal error, since this is an admin-only endpoint
raise InternalServerError(str(e))
return delete_response()