This repository has been archived by the owner on May 11, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
__init__.py
221 lines (179 loc) · 6.35 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# encoding: utf-8
# pylint: disable=too-few-public-methods,invalid-name,abstract-method,method-hidden
"""
RESTful API permissions
-----------------------
"""
import logging
from flask_sqlalchemy import BaseQuery
from permission import Permission as BasePermission
from . import rules
log = logging.getLogger(__name__)
class PermissionExtendedQuery(BaseQuery):
"""
Extends BaseQuery class from flask_sqlalchemy to add get_or_403 method
Example:
>>> DataTransformation.query.get_or_403(id)
"""
def __init__(self, permisssion, *args, **kwargs):
super(PermissionExtendedQuery, self).__init__(*args, **kwargs)
self.permisssion = permisssion
def get_or_403(self, ident):
obj = self.get_or_404(ident)
with self.permisssion(obj=obj):
return obj
class Permission(BasePermission):
"""
Declares classmethod to provide extended BaseQuery to model,
which adds additional method get_or_403
"""
@classmethod
def get_query_class(cls):
"""
Returns extended BaseQuery class for flask_sqlalchemy model to provide get_or_403 method
Example:
>>> DataTransformation(db.Model):
... query_class = OwnerRolePermission.get_query_class()
"""
return lambda *args, **kwargs: PermissionExtendedQuery(cls, *args, **kwargs)
class PasswordRequiredPermissionMixin(object):
"""
Helper rule mixin that ensure that user password is correct if
`password_required` is set to True.
"""
def __init__(self, password_required=False, password=None, **kwargs):
# NOTE: kwargs is required since it is a mixin
"""
Args:
password_required (bool) - in some cases you may need to ask
users for a password to allow certain actions, enforce this
requirement by setting this :bool:`True`.
password (str) - pass a user-specified password here.
"""
self._password_required = password_required
self._password = password
super(PasswordRequiredPermissionMixin, self).__init__(**kwargs)
def rule(self):
_rule = super(PasswordRequiredPermissionMixin, self).rule()
if self._password_required:
_rule &= rules.PasswordRequiredRule(self._password)
return _rule
class WriteAccessPermission(Permission):
"""
Require a regular user role to perform an action.
"""
def rule(self):
return rules.InternalRoleRule() | rules.AdminRoleRule() | rules.WriteAccessRule()
class RolePermission(Permission):
"""
This class aims to help distinguish all role-type permissions.
"""
def __init__(self, partial=False, **kwargs):
"""
Args:
partial (bool) - True values is mostly useful for Swagger
documentation purposes.
"""
self._partial = partial
super(RolePermission, self).__init__(**kwargs)
def rule(self):
if self._partial:
return rules.PartialPermissionDeniedRule()
return rules.AllowAllRule()
class ActiveUserRolePermission(RolePermission):
"""
At least Active user is required.
"""
def rule(self):
return rules.ActiveUserRoleRule()
class AdminRolePermission(PasswordRequiredPermissionMixin, RolePermission):
"""
Admin role is required.
"""
def rule(self):
return (
rules.InternalRoleRule()
| (rules.AdminRoleRule() & super(AdminRolePermission, self).rule())
)
class InternalRolePermission(RolePermission):
"""
Internal role is required.
"""
def rule(self):
return rules.InternalRoleRule()
class SupervisorRolePermission(PasswordRequiredPermissionMixin, RolePermission):
"""
Supervisor/Admin may execute this action.
"""
def __init__(self, obj=None, **kwargs):
"""
Args:
obj (object) - any object can be passed here, which will be asked
via ``check_supervisor(current_user)`` method whether a current
user has enough permissions to perform an action on the given
object.
"""
self._obj = obj
super(SupervisorRolePermission, self).__init__(**kwargs)
def rule(self):
return (
rules.InternalRoleRule()
| (
(
rules.AdminRoleRule()
| rules.SupervisorRoleRule(obj=self._obj)
)
& super(SupervisorRolePermission, self).rule()
)
)
class OwnerRolePermission(PasswordRequiredPermissionMixin, RolePermission):
"""
Owner/Supervisor/Admin may execute this action.
"""
def __init__(self, obj=None, **kwargs):
"""
Args:
obj (object) - any object can be passed here, which will be asked
via ``check_owner(current_user)`` method whether a current user
has enough permissions to perform an action on the given
object.
"""
self._obj = obj
super(OwnerRolePermission, self).__init__(**kwargs)
def rule(self):
return (
rules.InternalRoleRule()
| (
(
rules.AdminRoleRule()
| rules.OwnerRoleRule(obj=self._obj)
| rules.SupervisorRoleRule(obj=self._obj)
)
& super(OwnerRolePermission, self).rule()
)
)
class WorkspaceMemberRolePermission(PasswordRequiredPermissionMixin, RolePermission):
"""
Workspace Member may execute this action
"""
def __init__(self, obj=None, **kwargs):
"""
Args:
obj (object) - any object can be passed here, which will be asked
via ``check_workspace_member(current_user)`` method whether a current user
has enough permissions to perform an action on the given
"""
self._obj = obj
super(WorkspaceMemberRolePermission, self).__init__(**kwargs)
def rule(self):
return (
rules.InternalRoleRule()
| (
(
rules.AdminRoleRule()
| rules.WorkspaceMemberRule(obj=self._obj)
| rules.SupervisorRoleRule(obj=self._obj)
)
& super(WorkspaceMemberRolePermission, self).rule()
)
)