Skip to content

Commit

Permalink
Fix issue with v3 tokens and group membership roles
Browse files Browse the repository at this point in the history
The driver calls used by v3 token controllers to obtain roles
for a user on both project and domain were incorrectly implemented,
leading to roles being missed out of the token. v2 tokens are not
affected, since they don't use the same driver calls.

This fixes these functions and adds additonal tests to cover the
cases (all of which would fail without this patch).  As part of this
fix, the implementation of "get_roles_for_user_and_project() is
pulled up into the driver class (like the domain equivalent is already),
since, for all implementations, it is independant of backend technology.

Fixes bug 1197874

Change-Id: I59b6882d93bdc8372be03fed0b390b002a6d0320
  • Loading branch information
henrynash committed Jul 6, 2013
1 parent 6450f75 commit 22e3fb7
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 68 deletions.
9 changes: 0 additions & 9 deletions keystone/identity/backends/kvs.py
Expand Up @@ -131,15 +131,6 @@ def get_projects_for_user(self, user_id):
user_ref = self._get_user(user_id)
return user_ref.get('tenants', [])

def get_roles_for_user_and_project(self, user_id, tenant_id):
self.get_user(user_id)
self.get_project(tenant_id)
try:
metadata_ref = self.get_metadata(user_id, tenant_id)
except exception.MetadataNotFound:
metadata_ref = {}
return metadata_ref.get('roles', [])

def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
self.get_user(user_id)
self.get_project(tenant_id)
Expand Down
24 changes: 13 additions & 11 deletions keystone/identity/backends/ldap.py
Expand Up @@ -149,12 +149,23 @@ def get_user_by_name(self, user_name, domain_id):

def get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):

def _get_roles_for_just_user_and_project(user_id, tenant_id):
self.get_user(user_id)
self.get_project(tenant_id)
user_dn = self.user._id_to_dn(user_id)
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(tenant_id))
if a.user_dn == user_dn]

if domain_id is not None:
raise NotImplemented('Domain metadata not supported by LDAP.')
msg = 'Domain metadata not supported by LDAP'
raise exception.NotImplemented(message=msg)
if not self.get_project(tenant_id) or not self.get_user(user_id):
return {}

metadata_ref = self.get_roles_for_user_and_project(user_id, tenant_id)
metadata_ref = _get_roles_for_just_user_and_project(user_id, tenant_id)
if not metadata_ref:
return {}
return {'roles': metadata_ref}
Expand Down Expand Up @@ -182,15 +193,6 @@ def get_project_users(self, tenant_id):
self.project.get_user_dns(tenant_id, rolegrants)]
return self._set_default_domain(users)

def get_roles_for_user_and_project(self, user_id, tenant_id):
self.get_user(user_id)
self.get_project(tenant_id)
user_dn = self.user._id_to_dn(user_id)
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(tenant_id))
if a.user_dn == user_dn]

def _subrole_id_to_dn(self, role_id, tenant_id):
if tenant_id is None:
return self.role._id_to_dn(role_id)
Expand Down
26 changes: 0 additions & 26 deletions keystone/identity/backends/sql.py
Expand Up @@ -368,32 +368,6 @@ def get_projects_for_user(self, user_id):
membership_refs = query.all()
return [x.project_id for x in membership_refs]

def _get_user_group_project_roles(self, metadata_ref, user_id, project_id):
group_refs = self.list_groups_for_user(user_id=user_id)
for x in group_refs:
try:
metadata_ref.update(
self.get_metadata(group_id=x['id'],
tenant_id=project_id))
except exception.MetadataNotFound:
# no group grant, skip
pass

def _get_user_project_roles(self, metadata_ref, user_id, project_id):
try:
metadata_ref.update(self.get_metadata(user_id, project_id))
except exception.MetadataNotFound:
pass

def get_roles_for_user_and_project(self, user_id, tenant_id):
session = self.get_session()
self._get_user(session, user_id)
self._get_project(session, tenant_id)
metadata_ref = {}
self._get_user_project_roles(metadata_ref, user_id, tenant_id)
self._get_user_group_project_roles(metadata_ref, user_id, tenant_id)
return list(set(metadata_ref.get('roles', [])))

def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
session = self.get_session()
self._get_user(session, user_id)
Expand Down
78 changes: 58 additions & 20 deletions keystone/identity/core.py
Expand Up @@ -186,50 +186,88 @@ def get_projects_for_user(self, user_id):
def get_roles_for_user_and_project(self, user_id, tenant_id):
"""Get the roles associated with a user within given tenant.
This includes roles directly assigned to the user on the
project, as well as those by virtue of group membership.
:returns: a list of role ids.
:raises: keystone.exception.UserNotFound,
keystone.exception.ProjectNotFound
"""
raise exception.NotImplemented()
def _get_group_project_roles(user_id, tenant_id):
role_list = []
group_refs = self.list_groups_for_user(user_id=user_id)
for x in group_refs:
try:
metadata_ref = self.get_metadata(group_id=x['id'],
tenant_id=tenant_id)
role_list += metadata_ref.get('roles', [])
except exception.MetadataNotFound:
# no group grant, skip
pass
return role_list

def _get_user_project_roles(user_id, tenant_id):
metadata_ref = {}
try:
metadata_ref = self.get_metadata(user_id=user_id,
tenant_id=tenant_id)
except exception.MetadataNotFound:
pass
return metadata_ref.get('roles', [])

self.get_user(user_id)
self.get_project(tenant_id)
user_role_list = _get_user_project_roles(user_id, tenant_id)
group_role_list = _get_group_project_roles(user_id, tenant_id)
# Use set() to process the list to remove any duplicates
return list(set(user_role_list + group_role_list))

def get_roles_for_user_and_domain(self, user_id, domain_id):
"""Get the roles associated with a user within given domain.
This includes roles directly assigned to the user on the
domain, as well as those by virtue of group membership.
:returns: a list of role ids.
:raises: keystone.exception.UserNotFound,
keystone.exception.ProjectNotFound
keystone.exception.DomainNotFound
"""

def update_metadata_for_group_domain_roles(self, metadata_ref,
user_id, domain_id):
def _get_group_domain_roles(user_id, domain_id):
role_list = []
group_refs = self.list_groups_for_user(user_id=user_id)
for x in group_refs:
try:
metadata_ref.update(
self.get_metadata(group_id=x['id'],
domain_id=domain_id))
except exception.MetadataNotFound:
# no group grant, skip
metadata_ref = self.get_metadata(group_id=x['id'],
domain_id=domain_id)
role_list += metadata_ref.get('roles', [])
except (exception.MetadataNotFound, exception.NotImplemented):
# MetadataNotFound implies no group grant, so skip.
# Ignore NotImplemented since not all backends support
# domains.
pass
return role_list

def update_metadata_for_user_domain_roles(self, metadata_ref,
user_id, domain_id):
def _get_user_domain_roles(user_id, domain_id):
metadata_ref = {}
try:
metadata_ref.update(self.get_metadata(user_id=user_id,
domain_id=domain_id))
except exception.MetadataNotFound:
metadata_ref = self.get_metadata(user_id=user_id,
domain_id=domain_id)
except (exception.MetadataNotFound, exception.NotImplemented):
# MetadataNotFound implies no user grants.
# Ignore NotImplemented since not all backends support
# domains.
pass
return metadata_ref.get('roles', [])

self.get_user(user_id)
self.get_domain(domain_id)
metadata_ref = {}
update_metadata_for_user_domain_roles(self, metadata_ref,
user_id, domain_id)
update_metadata_for_group_domain_roles(self, metadata_ref,
user_id, domain_id)
return list(set(metadata_ref.get('roles', [])))
user_role_list = _get_user_domain_roles(user_id, domain_id)
group_role_list = _get_group_domain_roles(user_id, domain_id)
# Use set() to process the list to remove any duplicates
return list(set(user_role_list + group_role_list))

def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
"""Add a role to a user within given tenant.
Expand Down
101 changes: 100 additions & 1 deletion tests/test_backend.py
Expand Up @@ -1081,7 +1081,7 @@ def test_role_grant_by_user_and_cross_domain_project(self):

def test_multi_role_grant_by_user_group_on_project_domain(self):
role_list = []
for _ in range(8):
for _ in range(10):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role['id'], role)
role_list.append(role)
Expand Down Expand Up @@ -1150,6 +1150,105 @@ def test_multi_role_grant_by_user_group_on_project_domain(self):
self.assertIn(role_list[6], roles_ref)
self.assertIn(role_list[7], roles_ref)

# Now test the alternate way of getting back lists of grants,
# where user and group roles are combined. These should match
# the above results.
combined_role_list = self.identity_api.get_roles_for_user_and_project(
user1['id'], project1['id'])
self.assertEquals(len(combined_role_list), 4)
self.assertIn(role_list[4]['id'], combined_role_list)
self.assertIn(role_list[5]['id'], combined_role_list)
self.assertIn(role_list[6]['id'], combined_role_list)
self.assertIn(role_list[7]['id'], combined_role_list)

combined_role_list = self.identity_api.get_roles_for_user_and_domain(
user1['id'], domain1['id'])
self.assertEquals(len(combined_role_list), 4)
self.assertIn(role_list[0]['id'], combined_role_list)
self.assertIn(role_list[1]['id'], combined_role_list)
self.assertIn(role_list[2]['id'], combined_role_list)
self.assertIn(role_list[3]['id'], combined_role_list)

def test_multi_group_grants_on_project_domain(self):
"""Test multiple group roles for user on project and domain.
Test Plan:
- Create 6 roles
- Create a domain, with a project, user and two groups
- Make the user a member of both groups
- Check no roles yet exit
- Assign a role to each user and both groups on both the
project and domain
- Get a list of effective roles for the user on both the
project and domain, checking we get back the correct three
roles
"""
role_list = []
for _ in range(6):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role['id'], role)
role_list.append(role)
domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain1['id'], domain1)
user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'password': uuid.uuid4().hex,
'enabled': True}
self.identity_api.create_user(user1['id'], user1)
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'enabled': True}
self.identity_api.create_group(group1['id'], group1)
group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'enabled': True}
self.identity_api.create_group(group2['id'], group2)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id']}
self.identity_api.create_project(project1['id'], project1)

self.identity_api.add_user_to_group(user1['id'],
group1['id'])
self.identity_api.add_user_to_group(user1['id'],
group2['id'])

roles_ref = self.identity_api.list_grants(
user_id=user1['id'],
project_id=project1['id'])
self.assertEquals(len(roles_ref), 0)
self.identity_api.create_grant(user_id=user1['id'],
domain_id=domain1['id'],
role_id=role_list[0]['id'])
self.identity_api.create_grant(group_id=group1['id'],
domain_id=domain1['id'],
role_id=role_list[1]['id'])
self.identity_api.create_grant(group_id=group2['id'],
domain_id=domain1['id'],
role_id=role_list[2]['id'])
self.identity_api.create_grant(user_id=user1['id'],
project_id=project1['id'],
role_id=role_list[3]['id'])
self.identity_api.create_grant(group_id=group1['id'],
project_id=project1['id'],
role_id=role_list[4]['id'])
self.identity_api.create_grant(group_id=group2['id'],
project_id=project1['id'],
role_id=role_list[5]['id'])

# Read by the roles, ensuring we get the correct 3 roles for
# both project and domain
combined_role_list = self.identity_api.get_roles_for_user_and_project(
user1['id'], project1['id'])
self.assertEquals(len(combined_role_list), 3)
self.assertIn(role_list[3]['id'], combined_role_list)
self.assertIn(role_list[4]['id'], combined_role_list)
self.assertIn(role_list[5]['id'], combined_role_list)

combined_role_list = self.identity_api.get_roles_for_user_and_domain(
user1['id'], domain1['id'])
self.assertEquals(len(combined_role_list), 3)
self.assertIn(role_list[0]['id'], combined_role_list)
self.assertIn(role_list[1]['id'], combined_role_list)
self.assertIn(role_list[2]['id'], combined_role_list)

def test_delete_role_with_user_and_group_grants(self):
role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role1['id'], role1)
Expand Down
51 changes: 50 additions & 1 deletion tests/test_backend_ldap.py
Expand Up @@ -498,7 +498,56 @@ def test_role_grant_by_user_and_cross_domain_project(self):
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')

def test_multi_role_grant_by_user_group_on_project_domain(self):
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
# This is a partial implementation of the standard test that
# is defined in test_backend.py. It omits both domain and
# group grants. since neither of these are yet supported by
# the ldap backend.

role_list = []
for _ in range(2):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role['id'], role)
role_list.append(role)

user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'password': uuid.uuid4().hex,
'enabled': True}
self.identity_api.create_user(user1['id'], user1)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id}
self.identity_api.create_project(project1['id'], project1)

self.identity_api.add_role_to_user_and_project(
user_id=user1['id'],
tenant_id=project1['id'],
role_id=role_list[0]['id'])
self.identity_api.add_role_to_user_and_project(
user_id=user1['id'],
tenant_id=project1['id'],
role_id=role_list[1]['id'])

# Although list_grants are not yet supported, we can test the
# alternate way of getting back lists of grants, where user
# and group roles are combined. Only directly assigned user
# roles are available, since group grants are not yet supported

combined_role_list = self.identity_api.get_roles_for_user_and_project(
user1['id'], project1['id'])
self.assertEquals(len(combined_role_list), 2)
self.assertIn(role_list[0]['id'], combined_role_list)
self.assertIn(role_list[1]['id'], combined_role_list)

# Finally, although domain roles are not implemented, check we can
# issue the combined get roles call with benign results, since thus is
# used in token generation

combined_role_list = self.identity_api.get_roles_for_user_and_domain(
user1['id'], CONF.identity.default_domain_id)
self.assertEquals(len(combined_role_list), 0)

def test_multi_group_grants_on_project_domain(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')

def test_delete_role_with_user_and_group_grants(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
Expand Down

0 comments on commit 22e3fb7

Please sign in to comment.