/
permissions.py
415 lines (353 loc) · 15.5 KB
/
permissions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# -*- coding: utf-8 -*-
from collections import defaultdict
from threading import local
from django.contrib.auth import get_permission_codename, get_user_model
from django.contrib.auth.models import Group
from django.contrib.sites.models import Site
from django.db.models import Q
from cms.exceptions import NoPermissionsException
from cms.models import (Page, PagePermission, GlobalPagePermission,
MASK_PAGE, MASK_CHILDREN, MASK_DESCENDANTS)
from cms.plugin_pool import plugin_pool
from cms.utils.conf import get_cms_setting
# thread local support
_thread_locals = local()
def set_current_user(user):
"""
Assigns current user from request to thread_locals, used by
CurrentUserMiddleware.
"""
_thread_locals.user = user
def get_current_user():
"""
Returns current user, or None
"""
return getattr(_thread_locals, 'user', None)
def has_page_add_permission(request):
"""
Return true if the current user has permission to add a new page. This is
just used for general add buttons - only superuser, or user with can_add in
globalpagepermission can add page.
Special case occur when page is going to be added from add page button in
change list - then we have target and position there, so check if user can
add page under target page will occur.
"""
opts = Page._meta
if request.user.is_superuser:
return True
# if add under page
target = request.GET.get('target', None)
position = request.GET.get('position', None)
from cms.utils.helpers import current_site
site = current_site(request)
if target:
try:
page = Page.objects.get(pk=target)
except Page.DoesNotExist:
return False
global_add_perm = GlobalPagePermission.objects.user_has_add_permission(
request.user, site).exists()
if (request.user.has_perm(opts.app_label + '.' + get_permission_codename('add', opts))
and global_add_perm):
return True
if position in ("first-child", "last-child"):
return page.has_add_permission(request)
elif position in ("left", "right"):
if page.parent_id:
return has_generic_permission(page.parent_id, request.user, "add", page.site)
else:
global_add_perm = GlobalPagePermission.objects.user_has_add_permission(
request.user, site).exists()
if (request.user.has_perm(opts.app_label + '.' + get_permission_codename('add', opts))
and global_add_perm):
return True
return False
def has_any_page_change_permissions(request):
from cms.utils.helpers import current_site
if not request.user.is_authenticated():
return False
return request.user.is_superuser or PagePermission.objects.filter(
page__site=current_site(request)
).filter(
Q(user=request.user) |
Q(group__in=request.user.groups.all())
).exists()
def has_page_change_permission(request):
"""
Return true if the current user has permission to change this page.
To be granted this permission, you need the cms.change_page permission.
In addition, if CMS_PERMISSION is enabled you also need to either have
global can_change permission or just on this page.
"""
from cms.utils.helpers import current_site
opts = Page._meta
site = current_site(request)
global_change_perm = GlobalPagePermission.objects.user_has_change_permission(
request.user, site).exists()
return request.user.is_superuser or (
request.user.has_perm(opts.app_label + '.' + get_permission_codename('change', opts))
and global_change_perm or has_any_page_change_permissions(request))
def has_global_page_permission(request, site=None, user=None, **filters):
"""
A helper function to check for global page permissions for the current user
and site. Caches the result on a request basis, so multiple calls to this
function inside of one request/response cycle only generate one query.
:param request: the Request object
:param site: the Site object or ID
:param filters: queryset filters, e.g. ``can_add = True``
:return: ``True`` or ``False``
"""
if not user:
user = request.user
if not user.is_authenticated():
return False
if not get_cms_setting('PERMISSION') or user.is_superuser:
return True
if not hasattr(request, '_cms_global_perms'):
request._cms_global_perms = {}
key = tuple((k, v) for k, v in filters.items())
if site:
key = (('site', site.pk if hasattr(site, 'pk') else int(site)),) + key
if key not in request._cms_global_perms:
qs = GlobalPagePermission.objects.with_user(user).filter(**filters)
if site:
qs = qs.filter(Q(sites__in=[site]) | Q(sites__isnull=True))
request._cms_global_perms[key] = qs.exists()
return request._cms_global_perms[key]
def get_user_permission_level(user):
"""
Returns highest user level from the page/permission hierarchy on which
user haves can_change_permission. Also takes look into user groups. Higher
level equals to lover number. Users on top of hierarchy have level 0. Level
is the same like page.level attribute.
Example:
A,W level 0
/ \
user B,GroupE level 1
/ \
C,X D,Y,W level 2
Users A, W have user level 0. GroupE and all his users have user level 1
If user D is a member of GroupE, his user level will be 1, otherwise is
2.
"""
if (user.is_superuser or
GlobalPagePermission.objects.with_can_change_permissions(user).exists()):
# those
return 0
try:
permission = PagePermission.objects.with_can_change_permissions(user).order_by('page__path')[0]
except IndexError:
# user isn't assigned to any node
raise NoPermissionsException
return permission.page.level
def get_subordinate_users(user):
"""
Returns users queryset, containing all subordinate users to given user
including users created by given user and not assigned to any page.
Not assigned users must be returned, because they shouldn't get lost, and
user should still have possibility to see them.
Only users created_by given user which are on the same, or lover level are
returned.
If user haves global permissions or is a superuser, then he can see all the
users.
This function is currently used in PagePermissionInlineAdminForm for limit
users in permission combobox.
Example:
A,W level 0
/ \
user B,GroupE level 1
Z / \
C,X D,Y,W level 2
Rules: W was created by user, Z was created by user, but is not assigned
to any page.
Will return [user, C, X, D, Y, Z]. W was created by user, but is also
assigned to higher level.
"""
# TODO: try to merge with PagePermissionManager.subordinate_to_user()
if user.is_superuser or \
GlobalPagePermission.objects.with_can_change_permissions(user):
return get_user_model().objects.all()
site = Site.objects.get_current()
page_id_allow_list = Page.permissions.get_change_permissions_id_list(user, site)
try:
user_level = get_user_permission_level(user)
except NoPermissionsException:
# no permission so only staff and no page permissions
qs = get_user_model().objects.distinct().filter(
Q(is_staff=True) &
Q(pageuser__created_by=user) &
Q(pagepermission__page=None)
)
qs = qs.exclude(pk=user.id).exclude(groups__user__pk=user.id)
return qs
# normal query
qs = get_user_model().objects.distinct().filter(
Q(is_staff=True) &
(Q(pagepermission__page__id__in=page_id_allow_list) & Q(pagepermission__page__level__gte=user_level))
| (Q(pageuser__created_by=user) & Q(pagepermission__page=None))
)
qs = qs.exclude(pk=user.id).exclude(groups__user__pk=user.id)
return qs
def get_subordinate_groups(user):
"""
Similar to get_subordinate_users, but returns queryset of Groups instead
of Users.
"""
if (user.is_superuser or
GlobalPagePermission.objects.with_can_change_permissions(user)):
return Group.objects.all()
site = Site.objects.get_current()
page_id_allow_list = Page.permissions.get_change_permissions_id_list(user, site)
try:
user_level = get_user_permission_level(user)
except NoPermissionsException:
# no permission no records
# page_id_allow_list is empty
return Group.objects.distinct().filter(
Q(pageusergroup__created_by=user) &
Q(pagepermission__page=None)
)
return Group.objects.distinct().filter(
(Q(pagepermission__page__id__in=page_id_allow_list) & Q(pagepermission__page__level__gte=user_level))
| (Q(pageusergroup__created_by=user) & Q(pagepermission__page=None))
)
def has_global_change_permissions_permission(request):
opts = GlobalPagePermission._meta
user = request.user
if user.is_superuser or (
user.has_perm(opts.app_label + '.' + get_permission_codename('change', opts)) and
has_global_page_permission(request, can_change_permissions=True)):
return True
return False
def has_generic_permission(page_id, user, attr, site):
"""
Permission getter for single page with given id.
Internally, this calls a method on PagePermissionsPermissionManager
"""
func = getattr(Page.permissions, "get_%s_id_list" % attr)
permission = func(user, site)
return permission == Page.permissions.GRANT_ALL or page_id in permission
def load_ancestors(pages):
"""
Loads the ancestors, children and descendants cache for a set of pages.
:param pages: A queryset of pages to examine
:return: The list of pages, including ancestors
"""
pages_by_id = dict((page.pk, page) for page in pages)
pages_list = list(pages)
# Ensure that all parent pages are present so that inheritance will work
# For most use cases, this should not actually do any work
missing = list(pages)
while missing:
page = missing.pop()
page.ancestors_descending = []
page._cached_children = []
page._cached_descendants = []
if page.parent_id and page.parent_id not in pages_by_id:
pages_list.append(page.parent)
pages_by_id[page.parent_id] = page.parent
missing.append(page.parent)
pages_list.sort(key=lambda page: page.path)
for page in pages_list:
if page.parent_id:
parent = pages_by_id[page.parent_id]
page.ancestors_descending = parent.ancestors_descending + [parent]
parent._cached_children.append(page)
for ancestor in page.ancestors_descending:
ancestor._cached_descendants.append(page)
else:
page.ancestors_descending = []
page.ancestors_ascending = list(reversed(page.ancestors_descending))
return pages_list
def get_any_page_view_permissions(request, page):
"""
Used by the admin template tag is_restricted
"""
if not get_cms_setting('PERMISSION'):
return [] # Maybe None here, to indicate "not applicable"?
if not hasattr(request, '_cms_view_perms'):
request._cms_view_perms = {}
page_id = page.pk if page.publisher_is_draft else page.publisher_public_id
if page_id not in request._cms_view_perms:
if not page.publisher_is_draft:
page = page.publisher_draft
perms = list(PagePermission.objects.for_page(page=page).filter(can_view=True))
request._cms_view_perms[page_id] = perms
return request._cms_view_perms.get(page_id, [])
def load_view_restrictions(request, pages):
""" Load all view restrictions for the pages and update the cache in the request
The request cache will receive values for all the pages, but the returned
dict will only have keys where restrictions actually exist
"""
restricted_pages = defaultdict(list)
if get_cms_setting('PERMISSION'):
if hasattr(request, '_cms_view_perms'):
cache = request._cms_view_perms
# TODO: Check if we have anything that requires checking
else:
cache = request._cms_view_perms = {}
pages_list = load_ancestors(pages)
pages_by_id = {}
for page in pages_list:
page_id = page.pk if page.publisher_is_draft else page.publisher_public_id
pages_by_id[page_id] = page
cache[page_id] = []
page_permissions = PagePermission.objects.filter(page__in=pages_by_id).select_related('group__users')
for perm in page_permissions:
perm_page = pages_by_id[perm.page_id]
# add the page itself
if perm.grant_on & MASK_PAGE:
restricted_pages[perm_page.pk].append(perm)
# add children
if perm.grant_on & MASK_CHILDREN:
children = perm_page.get_children()
for child in children:
restricted_pages[child.pk].append(perm)
# add descendants
elif perm.grant_on & MASK_DESCENDANTS:
descendants = perm_page.get_cached_descendants()
for child in descendants:
restricted_pages[child.pk].append(perm)
# Overwrite cache where we found restrictions
cache.update(restricted_pages)
return restricted_pages
def get_user_sites_queryset(user):
"""
Returns queryset of all sites available for given user.
1. For superuser always returns all sites.
2. For global user returns all sites he haves in global page permissions
together with any sites he is assigned to over an page.
3. For standard user returns just sites he is assigned to over pages.
"""
qs = Site.objects.all()
if not get_cms_setting('PERMISSION') or user.is_superuser:
return qs
global_ids = GlobalPagePermission.objects.with_user(user).filter(
Q(can_add=True) | Q(can_change=True)
).values_list('id', flat=True)
query = Q()
if global_ids:
query = Q(globalpagepermission__id__in=global_ids)
# haves some global permissions assigned
if not qs.filter(query).exists():
# haves global permissions, but none of sites is specified,
# so he haves access to all sites
return qs
# add some pages if he has permission to add / change them
query |= (
Q(Q(djangocms_pages__pagepermission__user=user) |
Q(djangocms_pages__pagepermission__group__user=user)) &
Q(Q(djangocms_pages__pagepermission__can_add=True) | Q(djangocms_pages__pagepermission__can_change=True))
)
return qs.filter(query).distinct()
def has_plugin_permission(user, plugin_type, permission_type):
"""
Checks that a user has permissions for the plugin-type given to perform
the action defined in permission_type
permission_type should be 'add', 'change' or 'delete'.
"""
plugin_class = plugin_pool.get_plugin(plugin_type)
plugin_model = plugin_class.model
plugin_opts = plugin_model._meta
return user.has_perm('%s.%s_%s' % (plugin_opts.app_label, permission_type,
plugin_opts.object_name.lower()))