Skip to content

Commit

Permalink
Fixes bug 902402- Integration Testcases for Keystone - users, Roles, …
Browse files Browse the repository at this point in the history
…and tenants

Change-Id: Id191c62aae76375c7f6205f80a52d45d0c645ed7
  • Loading branch information
RajalakshmiGanesan committed Aug 3, 2012
1 parent 8ba945e commit efc8bd7
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 0 deletions.
30 changes: 30 additions & 0 deletions tempest/services/identity/json/admin_client.py
Expand Up @@ -159,6 +159,36 @@ def delete_token(self, token_id):
resp, body = self.delete("tokens/%s" % token_id)
return resp, body

def list_users_for_tenant(self, tenant_id):
"""List users for a Tenant"""
resp, body = self.get('/tenants/%s/users' % tenant_id)
body = json.loads(body)
return resp, body['users']

def create_service(self, name, type, **kwargs):
"""Create a service"""
post_body = {
'name': name,
'type': type,
'description': kwargs.get('description')}
post_body = json.dumps({'OS-KSADM:service': post_body})
resp, body = self.post('/OS-KSADM/services', post_body,
self.headers)
body = json.loads(body)
return resp, body['OS-KSADM:service']

def get_service(self, service_id):
"""Get Service"""
url = '/OS-KSADM/services/%s' % service_id
resp, body = self.get(url)
body = json.loads(body)
return resp, body['OS-KSADM:service']

def delete_service(self, service_id):
"""Delete Service"""
url = '/OS-KSADM/services/%s' % service_id
return self.delete(url)


class TokenClient(RestClient):

Expand Down
66 changes: 66 additions & 0 deletions tempest/tests/identity/admin/test_services.py
@@ -0,0 +1,66 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from nose.plugins.attrib import attr
import unittest2 as unittest

from tempest import exceptions
from tempest.common.utils.data_utils import rand_name
from tempest.tests.identity.base import BaseIdentityAdminTest


class ServicesTest(BaseIdentityAdminTest):

def test_create_get_delete_service(self):
"""GET Service"""
try:
#Creating a Service
name = rand_name('service-')
type = rand_name('type--')
description = rand_name('description-')
resp, service_data = \
self.client.create_service(name, type, description=description)
self.assertTrue(resp['status'].startswith('2'))
#Verifying response body of create service
self.assertTrue('id' in service_data)
self.assertFalse(service_data['id'] is None)
self.assertTrue('name' in service_data)
self.assertEqual(name, service_data['name'])
self.assertTrue('type' in service_data)
self.assertEqual(type, service_data['type'])
self.assertTrue('description' in service_data)
self.assertEqual(description, service_data['description'])
#Get service
resp, fetched_service = self.client.get_service(service_data['id'])
self.assertTrue(resp['status'].startswith('2'))
#verifying the existence of service created
self.assertTrue('id' in fetched_service)
self.assertEquals(fetched_service['id'], service_data['id'])
self.assertTrue('name' in fetched_service)
self.assertEqual(fetched_service['name'], service_data['name'])
self.assertTrue('type' in fetched_service)
self.assertEqual(fetched_service['type'], service_data['type'])
self.assertTrue('description' in fetched_service)
self.assertEqual(fetched_service['description'],
service_data['description'])
finally:
#Deleting the service created in this method
resp, _ = self.client.delete_service(service_data['id'])
self.assertTrue(resp['status'].startswith('2'))
#Checking whether service is deleted successfully
self.assertRaises(exceptions.NotFound, self.client.get_service,
service_data['id'])
86 changes: 86 additions & 0 deletions tempest/tests/identity/admin/test_users.py
Expand Up @@ -239,9 +239,95 @@ def test_get_users_by_unauthorized_user(self):
self.assertRaises(exceptions.Unauthorized,
self.non_admin_client.get_users)

@attr(type='negative')
def test_get_users_request_without_token(self):
"""Request to get list of users without a valid token should fail"""
token = self.client.get_auth()
self.client.delete_token(token)
self.assertRaises(exceptions.Unauthorized, self.client.get_users)
self.client.clear_auth()

@attr(type='positive')
def test_list_users_for_tenant(self):
"""Return a list of all users for a tenant"""
self.data.setup_test_tenant()
user_ids = list()
fetched_user_ids = list()
resp, user1 = self.client.create_user('tenant_user1', 'password1',
self.data.tenant['id'],
'user1@123')
user_ids.append(user1['id'])
self.data.users.append(user1)
resp, user2 = self.client.create_user('tenant_user2', 'password2',
self.data.tenant['id'],
'user2@123')
user_ids.append(user2['id'])
self.data.users.append(user2)
#List of users for the respective tenant ID
resp, body = self.client.list_users_for_tenant(self.data.tenant['id'])
self.assertTrue(resp['status'].startswith('2'))
for i in body:
fetched_user_ids.append(i['id'])
#verifying the user Id in the list
missing_users =\
[user for user in user_ids if user not in fetched_user_ids]
self.assertEqual(0, len(missing_users),
"Failed to find user %s in fetched list"
% ', '.join(m_user for m_user in missing_users))

@attr(type='positive')
def test_list_users_with_roles_for_tenant(self):
"""Return list of users on tenant when roles are assigned to users"""
self.data.setup_test_user()
self.data.setup_test_role()
user = self.get_user_by_name(self.data.test_user)
tenant = self.get_tenant_by_name(self.data.test_tenant)
role = self.get_role_by_name(self.data.test_role)
#Assigning roles to two users
user_ids = list()
fetched_user_ids = list()
user_ids.append(user['id'])
self.client.assign_user_role(tenant['id'], user['id'], role['id'])
resp, second_user = self.client.create_user('second_user', 'password1',
self.data.tenant['id'],
'user1@123')
user_ids.append(second_user['id'])
self.data.users.append(second_user)
self.client.assign_user_role(tenant['id'], second_user['id'],
role['id'])
#List of users with roles for the respective tenant ID
resp, body = self.client.list_users_for_tenant(self.data.tenant['id'])
self.assertTrue(resp['status'].startswith('2'))
for i in body:
fetched_user_ids.append(i['id'])
#verifying the user Id in the list
missing_users =\
[user for user in user_ids if user not in fetched_user_ids]
self.assertEqual(0, len(missing_users),
"Failed to find user %s in fetched list"
% ', '.join(m_user for m_user in missing_users))

@attr(type='negative')
def test_list_users_with_invalid_tenant(self):
"""
Should not be able to return a list of all
users for a nonexistant tenant
"""
#Assign invalid tenant ids
invalid_id = list()
invalid_id.append(rand_name('999'))
invalid_id.append('alpha')
invalid_id.append(rand_name("dddd@#%%^$"))
invalid_id.append('!@#()$%^&*?<>{}[]')
#List the users with invalid tenant id
fail = list()
for invalid in invalid_id:
try:
resp, body = self.client.list_users_for_tenant(invalid)
except exceptions.NotFound:
pass
else:
fail.append(invalid)
if len(fail) != 0:
self.fail('Should raise Not Found when list users with invalid'
'tenant ids %s' % fail)

0 comments on commit efc8bd7

Please sign in to comment.