Skip to content

Commit

Permalink
Mod 910 list allowed roles and users (#25)
Browse files Browse the repository at this point in the history
* Monkeypatch `CatalogTool._listAllowedRolesAndUsers` to add `ram.cache` decorator.
See #MOD-910

* isort, PEP8

* Rebased

* Reverted changes in buildout.cfg

* Use getGroups instead getGroupsForPrincipal that is faster

* Test _listAllowedRolesAndUsers for anonymous user
  • Loading branch information
gbastien committed May 26, 2023
1 parent 73bc226 commit b783fbb
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 8 deletions.
4 changes: 2 additions & 2 deletions CHANGES.rst
Expand Up @@ -4,8 +4,8 @@ Changelog
0.69 (unreleased)
-----------------

- Nothing changed yet.

- Monkeypatch `CatalogTool._listAllowedRolesAndUsers` to add `ram.cache` decorator.
[gbastien]

0.68 (2023-05-12)
-----------------
Expand Down
10 changes: 6 additions & 4 deletions src/imio/helpers/cache.py
Expand Up @@ -220,7 +220,9 @@ def get_plone_groups_for_user_cachekey(method, user_id=None, user=None, the_obje
"""cachekey method for self.get_plone_groups_for_user."""
date = get_cachekey_volatile('_users_groups_value')
return (date,
user and user.id or user_id or get_current_user_id(getRequest()),
# use user.getId() and not user.id because when user is a PloneUser instance
# it does not have a "id" and returns "acl_users" which break the invalidation
user and user.getId() or user_id or get_current_user_id(getRequest()),
the_objects)


Expand All @@ -237,7 +239,7 @@ def get_plone_groups_for_user(user_id=None, user=None, the_objects=False):
return []
if the_objects:
pg = api.portal.get_tool("portal_groups")
user_groups = pg.getGroupsByUserId(user.id)
user_groups = pg.getGroupsByUserId(user.getId())
else:
user_groups = user.getGroups()
return sorted(user_groups)
Expand All @@ -247,7 +249,7 @@ def get_users_in_plone_groups_cachekey(method, group_id=None, group=None, the_ob
"""cachekey method for self.get_users_in_plone_groups."""
date = get_cachekey_volatile('_users_groups_value')
return (date,
group and group.id or group_id,
group and group.getId() or group_id,
the_objects)


Expand All @@ -260,5 +262,5 @@ def get_users_in_plone_groups(group_id=None, group=None, the_objects=False):
return []
members = group.getGroupMembers()
if not the_objects:
members = [m.id for m in members]
members = [m.getId() for m in members]
return sorted(members)
7 changes: 7 additions & 0 deletions src/imio/helpers/configure.zcml
Expand Up @@ -82,6 +82,13 @@
replacement=".patches.removeRoleFromPrincipal"
preserveOriginal="true" />

<monkey:patch
description="Monkeypatch _listAllowedRolesAndUsers to add ram.cache on '_users_groups_value'"
class="Products.CMFPlone.CatalogTool.CatalogTool"
original="_listAllowedRolesAndUsers"
replacement=".patches._listAllowedRolesAndUsers"
preserveOriginal="true" />

<utility component=".vocabularies.SortedUsersFactory"
name="imio.helpers.SortedUsers" />

Expand Down
37 changes: 37 additions & 0 deletions src/imio/helpers/patches.py
@@ -1,5 +1,10 @@
# -*- coding: utf-8 -*-

from imio.helpers.cache import get_cachekey_volatile
from imio.helpers.cache import get_plone_groups_for_user
from imio.helpers.cache import invalidate_cachekey_volatile_for
from plone.api.exc import InvalidParameterError
from plone.memoize import ram
from Products.PlonePAS.plugins.role import GroupAwareRoleManager
from Products.PluggableAuthService.plugins.ZODBRoleManager import ZODBRoleManager
from smtplib import SMTP_SSL
Expand Down Expand Up @@ -49,3 +54,35 @@ def removeRoleFromPrincipal(self, role_id, principal_id):
ZODBRoleManager._old_removeRoleFromPrincipal(self, role_id, principal_id)
# we need to invalidate cachekey
invalidate_cachekey_volatile_for('_users_groups_value')


def _listAllowedRolesAndUsers_cachekey(method, self, user):
'''cachekey method for self._listAllowedRolesAndUsers.'''
date = get_cachekey_volatile('_users_groups_value')
return date, user.getId()


@ram.cache(_listAllowedRolesAndUsers_cachekey)
def _listAllowedRolesAndUsers(self, user):
"""Monkeypatch to use get_plone_groups_for_user instead getGroups.
Moreover store this in the REQUEST."""
# Makes sure the list includes the user's groups.
result = user.getRoles()
if 'Anonymous' in result:
# The anonymous user has no further roles
return ['Anonymous']
result = list(result)
# XXX change, replaced getGroups by get_plone_groups_for_user
# if hasattr(aq_base(user), 'getGroups'):
# groups = ['user:%s' % x for x in user.getGroups()]
try:
groups = get_plone_groups_for_user(user=user)
except InvalidParameterError:
groups = user.getGroups()
if groups:
groups = ['user:%s' % x for x in groups]
result = result + groups
# Order the arguments from small to large sets
result.insert(0, 'user:%s' % user.getId())
result.append('Anonymous')
return result
44 changes: 42 additions & 2 deletions src/imio/helpers/tests/test_cache.py
Expand Up @@ -15,10 +15,14 @@
from imio.helpers.testing import IntegrationTestCase
from persistent.mapping import PersistentMapping
from plone import api
from plone.app.testing import login
from plone.app.testing import logout
from plone.memoize import forever
from plone.memoize import ram
from plone.memoize.instance import Memojito
from plone.memoize.interfaces import ICacheChooser
from Products.CMFCore.utils import _getAuthenticatedUser
from Products.PlonePAS.plugins.ufactory import PloneUser
from zope.component import getUtility
from zope.component import queryUtility
from zope.ramcache.interfaces.ram import IRAMCache
Expand Down Expand Up @@ -421,5 +425,41 @@ def test__users_groups_value_invalidation(self):
value9 = get_cachekey_volatile('_users_groups_value')
self.assertNotEqual(value8, value9)


# TODO Add tests for other cached methods
def test__listAllowedRolesAndUsers(self):
pgr = self.portal['portal_groups']
new_user = api.user.create(username='new_user', email='test@test.be')
self.assertEqual(
self.catalog._listAllowedRolesAndUsers(new_user),
['user:new_user', 'Member', 'Authenticated',
'user:AuthenticatedUsers', 'Anonymous'])
pgr.addPrincipalToGroup(new_user.getId(), 'Administrators')
# get again, as we use "getGroups", it is stored on user instance
new_user = api.user.get(new_user.getId())
self.assertEqual(
self.catalog._listAllowedRolesAndUsers(new_user),
['user:new_user', 'Member', 'Manager', 'Authenticated',
'user:Administrators', 'user:AuthenticatedUsers', 'Anonymous'])
# behaves correctly with a PloneUser because of id/getId
plone_user1 = _getAuthenticatedUser(self.portal)
self.assertTrue(isinstance(plone_user1, PloneUser))
self.assertEqual(plone_user1.id, "acl_users")
self.assertEqual(plone_user1.getId(), "test_user_1_")
self.assertEqual(
self.catalog._listAllowedRolesAndUsers(plone_user1),
['user:test_user_1_', 'Manager', 'Authenticated',
'user:AuthenticatedUsers', 'Anonymous'])
login(self.portal, new_user.getId())
plone_user2 = _getAuthenticatedUser(self.portal)
self.assertTrue(isinstance(plone_user2, PloneUser))
self.assertEqual(plone_user2.id, "acl_users")
self.assertEqual(plone_user2.getId(), "new_user")
self.assertEqual(
self.catalog._listAllowedRolesAndUsers(plone_user2),
['user:new_user', 'Member', 'Manager', 'Authenticated',
'user:Administrators', 'user:AuthenticatedUsers', 'Anonymous'])
# as Anonymous
logout()
anon_user = _getAuthenticatedUser(self.portal)
self.assertEqual(
self.catalog._listAllowedRolesAndUsers(anon_user),
['Anonymous'])

0 comments on commit b783fbb

Please sign in to comment.