diff --git a/keystone/identity/backends/kvs.py b/keystone/identity/backends/kvs.py index 2eea08cf10..565a558771 100644 --- a/keystone/identity/backends/kvs.py +++ b/keystone/identity/backends/kvs.py @@ -131,15 +131,6 @@ def get_projects_for_user(self, user_id): user_ref = self._get_user(user_id) return user_ref.get('tenants', []) - def get_roles_for_user_and_project(self, user_id, tenant_id): - self.get_user(user_id) - self.get_project(tenant_id) - try: - metadata_ref = self.get_metadata(user_id, tenant_id) - except exception.MetadataNotFound: - metadata_ref = {} - return metadata_ref.get('roles', []) - def add_role_to_user_and_project(self, user_id, tenant_id, role_id): self.get_user(user_id) self.get_project(tenant_id) diff --git a/keystone/identity/backends/ldap.py b/keystone/identity/backends/ldap.py index dffbf8353b..2387c84edb 100644 --- a/keystone/identity/backends/ldap.py +++ b/keystone/identity/backends/ldap.py @@ -149,12 +149,23 @@ def get_user_by_name(self, user_name, domain_id): def get_metadata(self, user_id=None, tenant_id=None, domain_id=None, group_id=None): + + def _get_roles_for_just_user_and_project(user_id, tenant_id): + self.get_user(user_id) + self.get_project(tenant_id) + user_dn = self.user._id_to_dn(user_id) + return [self.role._dn_to_id(a.role_dn) + for a in self.role.get_role_assignments + (self.project._id_to_dn(tenant_id)) + if a.user_dn == user_dn] + if domain_id is not None: - raise NotImplemented('Domain metadata not supported by LDAP.') + msg = 'Domain metadata not supported by LDAP' + raise exception.NotImplemented(message=msg) if not self.get_project(tenant_id) or not self.get_user(user_id): return {} - metadata_ref = self.get_roles_for_user_and_project(user_id, tenant_id) + metadata_ref = _get_roles_for_just_user_and_project(user_id, tenant_id) if not metadata_ref: return {} return {'roles': metadata_ref} @@ -182,15 +193,6 @@ def get_project_users(self, tenant_id): self.project.get_user_dns(tenant_id, rolegrants)] return self._set_default_domain(users) - def get_roles_for_user_and_project(self, user_id, tenant_id): - self.get_user(user_id) - self.get_project(tenant_id) - user_dn = self.user._id_to_dn(user_id) - return [self.role._dn_to_id(a.role_dn) - for a in self.role.get_role_assignments - (self.project._id_to_dn(tenant_id)) - if a.user_dn == user_dn] - def _subrole_id_to_dn(self, role_id, tenant_id): if tenant_id is None: return self.role._id_to_dn(role_id) diff --git a/keystone/identity/backends/sql.py b/keystone/identity/backends/sql.py index f81feb1d1a..ba318c7389 100644 --- a/keystone/identity/backends/sql.py +++ b/keystone/identity/backends/sql.py @@ -368,32 +368,6 @@ def get_projects_for_user(self, user_id): membership_refs = query.all() return [x.project_id for x in membership_refs] - def _get_user_group_project_roles(self, metadata_ref, user_id, project_id): - group_refs = self.list_groups_for_user(user_id=user_id) - for x in group_refs: - try: - metadata_ref.update( - self.get_metadata(group_id=x['id'], - tenant_id=project_id)) - except exception.MetadataNotFound: - # no group grant, skip - pass - - def _get_user_project_roles(self, metadata_ref, user_id, project_id): - try: - metadata_ref.update(self.get_metadata(user_id, project_id)) - except exception.MetadataNotFound: - pass - - def get_roles_for_user_and_project(self, user_id, tenant_id): - session = self.get_session() - self._get_user(session, user_id) - self._get_project(session, tenant_id) - metadata_ref = {} - self._get_user_project_roles(metadata_ref, user_id, tenant_id) - self._get_user_group_project_roles(metadata_ref, user_id, tenant_id) - return list(set(metadata_ref.get('roles', []))) - def add_role_to_user_and_project(self, user_id, tenant_id, role_id): session = self.get_session() self._get_user(session, user_id) diff --git a/keystone/identity/core.py b/keystone/identity/core.py index a254470e9c..09324ea3ff 100644 --- a/keystone/identity/core.py +++ b/keystone/identity/core.py @@ -186,50 +186,88 @@ def get_projects_for_user(self, user_id): def get_roles_for_user_and_project(self, user_id, tenant_id): """Get the roles associated with a user within given tenant. + This includes roles directly assigned to the user on the + project, as well as those by virtue of group membership. + :returns: a list of role ids. :raises: keystone.exception.UserNotFound, keystone.exception.ProjectNotFound """ - raise exception.NotImplemented() + def _get_group_project_roles(user_id, tenant_id): + role_list = [] + group_refs = self.list_groups_for_user(user_id=user_id) + for x in group_refs: + try: + metadata_ref = self.get_metadata(group_id=x['id'], + tenant_id=tenant_id) + role_list += metadata_ref.get('roles', []) + except exception.MetadataNotFound: + # no group grant, skip + pass + return role_list + + def _get_user_project_roles(user_id, tenant_id): + metadata_ref = {} + try: + metadata_ref = self.get_metadata(user_id=user_id, + tenant_id=tenant_id) + except exception.MetadataNotFound: + pass + return metadata_ref.get('roles', []) + + self.get_user(user_id) + self.get_project(tenant_id) + user_role_list = _get_user_project_roles(user_id, tenant_id) + group_role_list = _get_group_project_roles(user_id, tenant_id) + # Use set() to process the list to remove any duplicates + return list(set(user_role_list + group_role_list)) def get_roles_for_user_and_domain(self, user_id, domain_id): """Get the roles associated with a user within given domain. + This includes roles directly assigned to the user on the + domain, as well as those by virtue of group membership. + :returns: a list of role ids. :raises: keystone.exception.UserNotFound, - keystone.exception.ProjectNotFound + keystone.exception.DomainNotFound """ - def update_metadata_for_group_domain_roles(self, metadata_ref, - user_id, domain_id): + def _get_group_domain_roles(user_id, domain_id): + role_list = [] group_refs = self.list_groups_for_user(user_id=user_id) for x in group_refs: try: - metadata_ref.update( - self.get_metadata(group_id=x['id'], - domain_id=domain_id)) - except exception.MetadataNotFound: - # no group grant, skip + metadata_ref = self.get_metadata(group_id=x['id'], + domain_id=domain_id) + role_list += metadata_ref.get('roles', []) + except (exception.MetadataNotFound, exception.NotImplemented): + # MetadataNotFound implies no group grant, so skip. + # Ignore NotImplemented since not all backends support + # domains. pass + return role_list - def update_metadata_for_user_domain_roles(self, metadata_ref, - user_id, domain_id): + def _get_user_domain_roles(user_id, domain_id): + metadata_ref = {} try: - metadata_ref.update(self.get_metadata(user_id=user_id, - domain_id=domain_id)) - except exception.MetadataNotFound: + metadata_ref = self.get_metadata(user_id=user_id, + domain_id=domain_id) + except (exception.MetadataNotFound, exception.NotImplemented): + # MetadataNotFound implies no user grants. + # Ignore NotImplemented since not all backends support + # domains. pass + return metadata_ref.get('roles', []) self.get_user(user_id) self.get_domain(domain_id) - metadata_ref = {} - update_metadata_for_user_domain_roles(self, metadata_ref, - user_id, domain_id) - update_metadata_for_group_domain_roles(self, metadata_ref, - user_id, domain_id) - return list(set(metadata_ref.get('roles', []))) + user_role_list = _get_user_domain_roles(user_id, domain_id) + group_role_list = _get_group_domain_roles(user_id, domain_id) + # Use set() to process the list to remove any duplicates + return list(set(user_role_list + group_role_list)) def add_role_to_user_and_project(self, user_id, tenant_id, role_id): """Add a role to a user within given tenant. diff --git a/tests/test_backend.py b/tests/test_backend.py index ea40cd8b1a..ebf949240e 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1081,7 +1081,7 @@ def test_role_grant_by_user_and_cross_domain_project(self): def test_multi_role_grant_by_user_group_on_project_domain(self): role_list = [] - for _ in range(8): + for _ in range(10): role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role['id'], role) role_list.append(role) @@ -1150,6 +1150,105 @@ def test_multi_role_grant_by_user_group_on_project_domain(self): self.assertIn(role_list[6], roles_ref) self.assertIn(role_list[7], roles_ref) + # Now test the alternate way of getting back lists of grants, + # where user and group roles are combined. These should match + # the above results. + combined_role_list = self.identity_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEquals(len(combined_role_list), 4) + self.assertIn(role_list[4]['id'], combined_role_list) + self.assertIn(role_list[5]['id'], combined_role_list) + self.assertIn(role_list[6]['id'], combined_role_list) + self.assertIn(role_list[7]['id'], combined_role_list) + + combined_role_list = self.identity_api.get_roles_for_user_and_domain( + user1['id'], domain1['id']) + self.assertEquals(len(combined_role_list), 4) + self.assertIn(role_list[0]['id'], combined_role_list) + self.assertIn(role_list[1]['id'], combined_role_list) + self.assertIn(role_list[2]['id'], combined_role_list) + self.assertIn(role_list[3]['id'], combined_role_list) + + def test_multi_group_grants_on_project_domain(self): + """Test multiple group roles for user on project and domain. + + Test Plan: + - Create 6 roles + - Create a domain, with a project, user and two groups + - Make the user a member of both groups + - Check no roles yet exit + - Assign a role to each user and both groups on both the + project and domain + - Get a list of effective roles for the user on both the + project and domain, checking we get back the correct three + roles + + """ + role_list = [] + for _ in range(6): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_role(role['id'], role) + role_list.append(role) + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, + 'enabled': True} + self.identity_api.create_user(user1['id'], user1) + group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], 'enabled': True} + self.identity_api.create_group(group1['id'], group1) + group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], 'enabled': True} + self.identity_api.create_group(group2['id'], group2) + project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.identity_api.create_project(project1['id'], project1) + + self.identity_api.add_user_to_group(user1['id'], + group1['id']) + self.identity_api.add_user_to_group(user1['id'], + group2['id']) + + roles_ref = self.identity_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEquals(len(roles_ref), 0) + self.identity_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role_list[0]['id']) + self.identity_api.create_grant(group_id=group1['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id']) + self.identity_api.create_grant(group_id=group2['id'], + domain_id=domain1['id'], + role_id=role_list[2]['id']) + self.identity_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role_list[3]['id']) + self.identity_api.create_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=role_list[4]['id']) + self.identity_api.create_grant(group_id=group2['id'], + project_id=project1['id'], + role_id=role_list[5]['id']) + + # Read by the roles, ensuring we get the correct 3 roles for + # both project and domain + combined_role_list = self.identity_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEquals(len(combined_role_list), 3) + self.assertIn(role_list[3]['id'], combined_role_list) + self.assertIn(role_list[4]['id'], combined_role_list) + self.assertIn(role_list[5]['id'], combined_role_list) + + combined_role_list = self.identity_api.get_roles_for_user_and_domain( + user1['id'], domain1['id']) + self.assertEquals(len(combined_role_list), 3) + self.assertIn(role_list[0]['id'], combined_role_list) + self.assertIn(role_list[1]['id'], combined_role_list) + self.assertIn(role_list[2]['id'], combined_role_list) + def test_delete_role_with_user_and_group_grants(self): role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role1['id'], role1) diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py index 577a6ef04a..7d44ebe494 100644 --- a/tests/test_backend_ldap.py +++ b/tests/test_backend_ldap.py @@ -498,7 +498,56 @@ def test_role_grant_by_user_and_cross_domain_project(self): raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') def test_multi_role_grant_by_user_group_on_project_domain(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + # This is a partial implementation of the standard test that + # is defined in test_backend.py. It omits both domain and + # group grants. since neither of these are yet supported by + # the ldap backend. + + role_list = [] + for _ in range(2): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_role(role['id'], role) + role_list.append(role) + + user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': CONF.identity.default_domain_id, + 'password': uuid.uuid4().hex, + 'enabled': True} + self.identity_api.create_user(user1['id'], user1) + project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': CONF.identity.default_domain_id} + self.identity_api.create_project(project1['id'], project1) + + self.identity_api.add_role_to_user_and_project( + user_id=user1['id'], + tenant_id=project1['id'], + role_id=role_list[0]['id']) + self.identity_api.add_role_to_user_and_project( + user_id=user1['id'], + tenant_id=project1['id'], + role_id=role_list[1]['id']) + + # Although list_grants are not yet supported, we can test the + # alternate way of getting back lists of grants, where user + # and group roles are combined. Only directly assigned user + # roles are available, since group grants are not yet supported + + combined_role_list = self.identity_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEquals(len(combined_role_list), 2) + self.assertIn(role_list[0]['id'], combined_role_list) + self.assertIn(role_list[1]['id'], combined_role_list) + + # Finally, although domain roles are not implemented, check we can + # issue the combined get roles call with benign results, since thus is + # used in token generation + + combined_role_list = self.identity_api.get_roles_for_user_and_domain( + user1['id'], CONF.identity.default_domain_id) + self.assertEquals(len(combined_role_list), 0) + + def test_multi_group_grants_on_project_domain(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') def test_delete_role_with_user_and_group_grants(self): raise nose.exc.SkipTest('Blocked by bug 1101287') diff --git a/tests/test_v3_auth.py b/tests/test_v3_auth.py index a2bee8b866..c38d13c98b 100644 --- a/tests/test_v3_auth.py +++ b/tests/test_v3_auth.py @@ -888,6 +888,137 @@ def test_project_id_scoped_token_with_user_id_401(self): project_id=project['id']) self.post('/auth/tokens', body=auth_data, expected_status=401) + def test_user_and_group_roles_scoped_token(self): + """Test correct roles are returned in scoped token. + + Test Plan: + - Create a domain, with 1 project, 2 users (user1 and user2) + and 2 groups (group1 and group2) + - Make user1 a member of group1, user2 a member of group2 + - Create 8 roles, assigning them to each of the 8 combinations + of users/groups on domain/project + - Get a project scoped token for user1, checking that the right + two roles are returned (one directly assigned, one by virtue + of group membership) + - Repeat this for a domain scoped token + - Make user1 also a member of group2 + - Get another scoped token making sure the additional role + shows up + - User2 is just here as a spoiler, to make sure we don't get + any roles uniquely assigned to it returned in any of our + tokens + + """ + + domainA = self.new_domain_ref() + self.identity_api.create_domain(domainA['id'], domainA) + projectA = self.new_project_ref(domain_id=domainA['id']) + self.identity_api.create_project(projectA['id'], projectA) + + user1 = self.new_user_ref( + domain_id=domainA['id']) + user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(user1['id'], user1) + + user2 = self.new_user_ref( + domain_id=domainA['id']) + user2['password'] = uuid.uuid4().hex + self.identity_api.create_user(user2['id'], user2) + + group1 = self.new_group_ref( + domain_id=domainA['id']) + self.identity_api.create_group(group1['id'], group1) + + group2 = self.new_group_ref( + domain_id=domainA['id']) + self.identity_api.create_group(group2['id'], group2) + + self.identity_api.add_user_to_group(user1['id'], + group1['id']) + self.identity_api.add_user_to_group(user2['id'], + group2['id']) + + # Now create all the roles and assign them + role_list = [] + for _ in range(8): + role = self.new_role_ref() + self.identity_api.create_role(role['id'], role) + role_list.append(role) + + self.identity_api.create_grant(role_list[0]['id'], + user_id=user1['id'], + domain_id=domainA['id']) + self.identity_api.create_grant(role_list[1]['id'], + user_id=user1['id'], + project_id=projectA['id']) + self.identity_api.create_grant(role_list[2]['id'], + user_id=user2['id'], + domain_id=domainA['id']) + self.identity_api.create_grant(role_list[3]['id'], + user_id=user2['id'], + project_id=projectA['id']) + self.identity_api.create_grant(role_list[4]['id'], + group_id=group1['id'], + domain_id=domainA['id']) + self.identity_api.create_grant(role_list[5]['id'], + group_id=group1['id'], + project_id=projectA['id']) + self.identity_api.create_grant(role_list[6]['id'], + group_id=group2['id'], + domain_id=domainA['id']) + self.identity_api.create_grant(role_list[7]['id'], + group_id=group2['id'], + project_id=projectA['id']) + + # First, get a project scoped token - which should + # contain the direct user role and the one by virtue + # of group membership + auth_data = self.build_authentication_request( + user_id=user1['id'], + password=user1['password'], + project_id=projectA['id']) + r = self.post('/auth/tokens', body=auth_data) + token = self.assertValidScopedTokenResponse(r) + roles_ids = [] + for i, ref in enumerate(token['roles']): + roles_ids.append(ref['id']) + self.assertEqual(len(token['roles']), 2) + self.assertIn(role_list[1]['id'], roles_ids) + self.assertIn(role_list[5]['id'], roles_ids) + + # Now the same thing for a domain scoped token + auth_data = self.build_authentication_request( + user_id=user1['id'], + password=user1['password'], + domain_id=domainA['id']) + r = self.post('/auth/tokens', body=auth_data) + token = self.assertValidScopedTokenResponse(r) + roles_ids = [] + for i, ref in enumerate(token['roles']): + roles_ids.append(ref['id']) + self.assertEqual(len(token['roles']), 2) + self.assertIn(role_list[0]['id'], roles_ids) + self.assertIn(role_list[4]['id'], roles_ids) + + # Finally, add user1 to the 2nd group, and get a new + # scoped token - the extra role should now be included + # by virtue of the 2nd group + self.identity_api.add_user_to_group(user1['id'], + group2['id']) + auth_data = self.build_authentication_request( + user_id=user1['id'], + password=user1['password'], + project_id=projectA['id']) + r = self.post('/auth/tokens', body=auth_data) + token = self.assertValidScopedTokenResponse(r) + roles_ids = [] + for i, ref in enumerate(token['roles']): + roles_ids.append(ref['id']) + self.assertEqual(len(token['roles']), 3) + self.assertIn(role_list[1]['id'], roles_ids) + self.assertIn(role_list[5]['id'], roles_ids) + self.assertIn(role_list[7]['id'], roles_ids) + def test_project_id_scoped_token_with_user_domain_id(self): auth_data = self.build_authentication_request( username=self.user['name'],