Skip to content

Commit

Permalink
Improve swift_auth test coverage + Minor fixes
Browse files Browse the repository at this point in the history
 * Isolates authorize() tests from wsgi tests
 * Adds coverage for authorize()
 * Adds support for a blank reseller_prefix
 * Adds swift_auth test dependencies to tools/test-requires
 * Cleans up authorize()'s use of tenant_id/tenant_name
   (addresses bug 963546)

Change-Id: I603b89ab4fe8559b0f5d72528afd659ee0f0bce1
  • Loading branch information
Maru Newby committed Mar 23, 2012
1 parent 79e93be commit f3af691
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 146 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Expand Up @@ -67,6 +67,7 @@ Liem Nguyen <liem.m.nguyen@hp.com>
lzyeval <lzyeval@gmail.com>
Mark Gius <mgius7096@gmail.com>
Mark McLoughlin <markmc@redhat.com>
Maru Newby <mnewby@internap.com>
Michael Basnight <mbasnight@gmail.com>
Michael Still <mikal@stillhq.com>
Monty Taylor <mordred@inaugust.com>
Expand Down
18 changes: 9 additions & 9 deletions keystone/middleware/swift_auth.py
Expand Up @@ -121,12 +121,12 @@ def _keystone_identity(self, environ):

def _reseller_check(self, account, tenant_id):
"""Check reseller prefix."""
return account == '%s_%s' % (self.reseller_prefix, tenant_id)
return account == '%s%s' % (self.reseller_prefix, tenant_id)

def authorize(self, req):
env = req.environ
env_identity = env.get('keystone.identity', {})
tenant = env_identity.get('tenant')
tenant_id, tenant_name = env_identity.get('tenant')

try:
part = swift_utils.split_path(req.path, 1, 4, True)
Expand All @@ -140,14 +140,14 @@ def authorize(self, req):
# role.
if self.reseller_admin_role in user_roles:
msg = 'User %s has reseller admin authorizing'
self.logger.debug(msg % tenant[0])
self.logger.debug(msg % tenant_id)
req.environ['swift_owner'] = True
return

# Check if a user tries to access an account that does not match their
# token
if not self._reseller_check(account, tenant[0]):
log_msg = 'tenant mismatch: %s != %s' % (account, tenant[0])
if not self._reseller_check(account, tenant_id):
log_msg = 'tenant mismatch: %s != %s' % (account, tenant_id)
self.logger.debug(log_msg)
return self.denied_response(req)

Expand All @@ -165,7 +165,7 @@ def authorize(self, req):

# If user is of the same name of the tenant then make owner of it.
user = env_identity.get('user', '')
if self.is_admin and user == tenant[1]:
if self.is_admin and user == tenant_name:
req.environ['swift_owner'] = True
return

Expand All @@ -192,16 +192,16 @@ def authorize(self, req):
return self.denied_response(req)

# Allow ACL at individual user level (tenant:user format)
if '%s:%s' % (tenant[0], user) in roles:
if '%s:%s' % (tenant_name, user) in roles:
log_msg = 'user %s:%s allowed in ACL authorizing'
self.logger.debug(log_msg % (tenant[0], user))
self.logger.debug(log_msg % (tenant_name, user))
return

# Check if we have the role in the userroles and allow it
for user_role in user_roles:
if user_role in roles:
log_msg = 'user %s:%s allowed in ACL: %s authorizing'
self.logger.debug(log_msg % (tenant[0], user, user_role))
self.logger.debug(log_msg % (tenant_name, user, user_role))
return

return self.denied_response(req)
Expand Down
281 changes: 144 additions & 137 deletions tests/test_swift_auth_middleware.py
Expand Up @@ -13,18 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import nose
import unittest2 as unittest
import webob

try:
# NOTE(chmou): We don't want to force to have swift installed for
# unit test so we skip it we have an ImportError.
from keystone.middleware import swift_auth
skip = False
except ImportError:
skip = True
from keystone.middleware import swift_auth


class FakeApp(object):
Expand Down Expand Up @@ -52,151 +44,166 @@ def __call__(self, env, start_response):
body=body)(env, start_response)


class FakeConn(object):
def __init__(self, status_headers_body_iter=None):
self.calls = 0
self.status_headers_body_iter = status_headers_body_iter
if not self.status_headers_body_iter:
self.status_headers_body_iter = iter([('404 Not Found', {}, '')])

def request(self, method, path, headers):
self.calls += 1
self.request_path = path
self.status, self.headers, self.body = \
self.status_headers_body_iter.next()
self.status, self.reason = self.status.split(' ', 1)
self.status = int(self.status)

def getresponse(self):
return self

def read(self):
body = self.body
self.body = ''
return body


class SwiftAuth(unittest.TestCase):
def setUp(self):
if skip:
raise nose.SkipTest('no swift detected')
self.auth = swift_auth
self.test_auth = self.auth.filter_factory({})(FakeApp())

def _make_request(self, path, **kwargs):
req = webob.Request.blank(path, **kwargs)
return req

def test_identity_status_denied(self):
env = {'HTTP_X_IDENTITY_STATUS': 'Denied'}
req = self._make_request('/v1/AUTH_acct')
req.environ.update(env)
self.test_auth = swift_auth.filter_factory({})(FakeApp())

def _make_request(self, path, headers=None, **kwargs):
return webob.Request.blank(path, headers=headers, **kwargs)

def _get_identity_headers(self, status='Confirmed', tenant_id='1',
tenant_name='acct', user='usr', role=''):
return dict(X_IDENTITY_STATUS=status,
X_TENANT_ID=tenant_id,
X_TENANT_NAME=tenant_name,
X_ROLE=role,
X_USER=user)

def test_confirmed_identity_is_authorized(self):
role = self.test_auth.reseller_admin_role
headers = self._get_identity_headers(role=role)
req = self._make_request('/v1/AUTH_acct/c', headers)
response_iter = iter([('200 OK', {}, '')])
test_auth = swift_auth.filter_factory({})(
FakeApp(response_iter))
resp = req.get_response(test_auth)
self.assertEquals(resp.status_int, 200)

def test_invalid_identity_is_not_authorized(self):
headers = self._get_identity_headers(status='Invalid')
req = self._make_request('/v1/AUTH_acct', headers)
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)

def test_auth_deny_non_reseller_prefix(self):
req = self._make_request('/v1/BLAH_account',
headers={'X-Auth-Token': 'BLAH_t'})
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)
self.assertEquals(resp.environ['swift.authorize'],
self.test_auth.denied_response)

def test_auth_deny_token_not_for_account(self):
env = {'HTTP_X_IDENTITY_STATUS': 'Confirmed',
'HTTP_X_ROLE': 'AUTH_acct',
'HTTP_X_TENANT_ID': '1',
'HTTP_X_TENANT_NAME': 'acct',
'HTTP_X_USER': 'usr'}
req = self._make_request('/v1/AUTH_1')
req.environ.update(env)
headers = self._get_identity_headers(role='AUTH_acct')
req = self._make_request('/v1/AUTH_1', headers)
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 403)

#NOTE(chmou): This should fail when we are going to add anonymous
#access back.
def test_default_forbidden(self):
env = {'HTTP_X_IDENTITY_STATUS': 'Confirmed',
'HTTP_X_USER': 'usr',
'HTTP_X_TENANT_ID': '1',
'HTTP_X_TENANT_NAME': 'acct',
'HTTP_X_ROLE': ''}
req = self._make_request('/v1/AUTH_acct')
req.environ.update(env)
headers = self._get_identity_headers()
req = self._make_request('/v1/AUTH_acct', headers)
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 403)

def test_operator_roles(self):
env = {'HTTP_X_IDENTITY_STATUS': 'Confirmed',
'HTTP_X_USER': 'usr',
'HTTP_X_TENANT_ID': '1',
'HTTP_X_TENANT_NAME': 'acct',
'HTTP_X_ROLE': 'owner'}
filter_factory = self.auth.filter_factory({'operator_roles': 'owner'})
self.test_auth = filter_factory(FakeApp())
req = self._make_request('/v1/AUTH_1')
req.environ.update(env)
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 404)
self.assertTrue('swift.authorize' in resp.environ)

def test_authorize_acl_referrer_access(self):
env = {'keystone.identity': {'roles': ['acct'],
'tenant': ('1', 'acct'),
'user': 'usr'}}

# 401 without referrer
req = self._make_request('/v1/AUTH_1/c')
req.environ.update(env)
resp = self.test_auth.authorize(req)
self.assertEquals(resp.status_int, 401)
def test_blank_reseller_prefix(self):
conf = {'reseller_prefix': ''}
test_auth = swift_auth.filter_factory(conf)(FakeApp())
account = tenant_id = 'foo'
self.assertTrue(test_auth._reseller_check(account, tenant_id))

# NOTE(chmou) This should be rewritten when we get proper anonymous
# container access support.
# Authorize when the ACL allow reading and container listings.
req = self._make_request('/v1/AUTH_1/c')
req.acl = '.r:*,.rlistings'
req.environ.update(env)
self.assertEquals(self.test_auth.authorize(req), None)

# 401 when container listing is not allowed.
req = self._make_request('/v1/AUTH_1/c')
req.environ.update(env)
req.acl = '.r:*'
resp = self.test_auth.authorize(req)
self.assertEquals(resp.status_int, 401)

# 401 with a url acl when not coming from there.
req = self._make_request('/v1/AUTH_1/c')
req.environ.update(env)
req.acl = '.r:.example.com,.rlistings'
resp = self.test_auth.authorize(req)
self.assertEquals(resp.status_int, 401)
class TestAuthorize(unittest.TestCase):
def setUp(self):
self.test_auth = swift_auth.filter_factory({})(FakeApp())

def _make_request(self, path, **kwargs):
return webob.Request.blank(path, **kwargs)

def _get_account(self, identity=None):
if not identity:
identity = self._get_identity()
return '%s%s' % (self.test_auth.reseller_prefix,
identity['tenant'][0])

def _get_identity(self, tenant_id='tenant_id',
tenant_name='tenant_name', user='user', roles=None):
if not roles:
roles = []
return dict(tenant=(tenant_id, tenant_name), user=user, roles=roles)

def _check_authenticate(self, account=None, identity=None, headers=None,
exception=None, acl=None, env=None, path=None):
if not identity:
identity = self._get_identity()
if not account:
account = self._get_account(identity)
if not path:
path = '/v1/%s/c' % account
default_env = {'keystone.identity': identity,
'REMOTE_USER': identity['tenant']}
if env:
default_env.update(env)
req = self._make_request(path, headers=headers, environ=default_env)
req.acl = acl
result = self.test_auth.authorize(req)
if exception:
self.assertIsInstance(result, exception)
else:
self.assertIsNone(result)
return req

# Authorize with the right referrer acl and the right url referrer.
req = self._make_request('/v1/AUTH_1/c')
req.environ.update(env)
req.referer = 'http://www.example.com/index.html'
req.acl = '.r:.example.com,.rlistings'
self.assertEquals(self.test_auth.authorize(req), None)

def test_acl_tenant(self):
env = {'keystone.identity': {'roles': ['allowme'],
'tenant': ('1', 'acct'),
'user': 'usr'}}
req = self._make_request('/v1/AUTH_1/c')
req.environ.update(env)
req.acl = 'allowme'
self.assertEquals(self.test_auth.authorize(req), None)

def test_acl_tenant_user(self):
env = {'keystone.identity': {'roles': [''],
'tenant': ('1', 'acct'),
'user': 'usr'}}
req = self._make_request('/v1/AUTH_1/c')
req.environ.update(env)
req.acl = '1:usr'
self.assertEquals(self.test_auth.authorize(req), None)
def test_authorize_fails_for_unauthorized_user(self):
self._check_authenticate(exception=webob.exc.HTTPForbidden)

def test_authorize_fails_for_invalid_reseller_prefix(self):
self._check_authenticate(account='BLAN_a',
exception=webob.exc.HTTPForbidden)

def test_authorize_succeeds_for_reseller_admin(self):
roles =[self.test_auth.reseller_admin_role]
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
self.assertTrue(req.environ.get('swift_owner'))

def test_authorize_succeeds_as_owner_for_operator_role(self):
roles = self.test_auth.operator_roles.split(',')[0]
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
self.assertTrue(req.environ.get('swift_owner'))

def _check_authorize_for_tenant_owner_match(self, exception=None):
identity = self._get_identity()
identity['user'] = identity['tenant'][1]
req = self._check_authenticate(identity=identity, exception=exception)
expected = bool(exception is None)
self.assertEqual(bool(req.environ.get('swift_owner')), expected)

def test_authorize_succeeds_as_owner_for_tenant_owner_match(self):
self.test_auth.is_admin = True
self._check_authorize_for_tenant_owner_match()

def test_authorize_fails_as_owner_for_tenant_owner_match(self):
self.test_auth.is_admin = False
self._check_authorize_for_tenant_owner_match(
exception=webob.exc.HTTPForbidden)

def test_authorize_succeeds_for_container_sync(self):
env = {'swift_sync_key': 'foo', 'REMOTE_ADDR': '127.0.0.1'}
headers = {'x-container-sync-key': 'foo', 'x-timestamp': None}
self._check_authenticate(env=env, headers=headers)

def test_authorize_fails_for_invalid_referrer(self):
env = {'HTTP_REFERER': 'http://invalid.com/index.html'}
self._check_authenticate(acl='.r:example.com', env=env,
exception=webob.exc.HTTPForbidden)

def test_authorize_fails_for_referrer_without_rlistings(self):
env = {'HTTP_REFERER': 'http://example.com/index.html'}
self._check_authenticate(acl='.r:example.com', env=env,
exception=webob.exc.HTTPForbidden)

def test_authorize_succeeds_for_referrer_with_rlistings(self):
env = {'HTTP_REFERER': 'http://example.com/index.html'}
self._check_authenticate(acl='.r:example.com,.rlistings', env=env)

def test_authorize_succeeds_for_referrer_with_obj(self):
path = '/v1/%s/c/o' % self._get_account()
env = {'HTTP_REFERER': 'http://example.com/index.html'}
self._check_authenticate(acl='.r:example.com', env=env, path=path)

def test_authorize_succeeds_for_user_role_in_roles(self):
acl = 'allowme'
identity = self._get_identity(roles=[acl])
self._check_authenticate(identity=identity, acl=acl)

def test_authorize_succeeds_for_tenant_user_in_roles(self):
identity = self._get_identity()
acl = '%s:%s' % (identity['tenant'][1], identity['user'])
self._check_authenticate(identity=identity, acl=acl)


if __name__ == '__main__':
Expand Down
4 changes: 4 additions & 0 deletions tools/test-requires
Expand Up @@ -23,3 +23,7 @@ httplib2
# for python-novaclient
prettytable
iso8601>=0.1.4

# swift_auth test dependencies
-e git+https://github.com/openstack/swift.git#egg=swift
netifaces

0 comments on commit f3af691

Please sign in to comment.