Skip to content

Commit

Permalink
Improve swift's keystoneauth ACL support
Browse files Browse the repository at this point in the history
Below three bug reports talk about one thing.
Current keystoneauth ACL supports as:

tenant_name:user_id         ok
tenant_name:user_name       no
tenant_name:*               no
tenant_id:user_id           ok
tenant_id:user_name         no
tenant_id:*                 no
*:user_id                   ok
*:user_name                 no
*:*                         no

This patch will make all of above work fine.
Applying (tenant/user)name could let user put or get their data in a
more readable way. The tenant_name:* and *:user_name is suitable for
many usage.

note: to keep compatibility here add a new keystone.identity just for
authorize() itself and leave env['keystone.identity'] to other
middlerwares.

Fixes: bug #1020709
Fixes: bug #1075362
Fixes: bug #1155389
Change-Id: I9354dedaad875117f6a9072c67e9ecf69bfca77e
  • Loading branch information
pyKun committed Jun 13, 2013
1 parent 5d52d2d commit 58a095b
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 47 deletions.
66 changes: 44 additions & 22 deletions swift/common/middleware/keystoneauth.py
@@ -1,5 +1,4 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand Down Expand Up @@ -119,6 +118,10 @@ def __call__(self, environ, start_response):

def _keystone_identity(self, environ):
"""Extract the identity from the Keystone auth component."""
# In next release, we would add user id in env['keystone.identity'] by
# using _integral_keystone_identity to replace current
# _keystone_identity. The purpose of keeping it in this release it for
# back compatibility.
if environ.get('HTTP_X_IDENTITY_STATUS') != 'Confirmed':
return
roles = []
Expand All @@ -130,40 +133,56 @@ def _keystone_identity(self, environ):
'roles': roles}
return identity

def _integral_keystone_identity(self, environ):
"""Extract the identity from the Keystone auth component."""
if environ.get('HTTP_X_IDENTITY_STATUS') != 'Confirmed':
return
roles = []
if 'HTTP_X_ROLES' in environ:
roles = environ['HTTP_X_ROLES'].split(',')
identity = {'user': (environ.get('HTTP_X_USER_ID'),
environ.get('HTTP_X_USER_NAME')),
'tenant': (environ.get('HTTP_X_TENANT_ID'),
environ.get('HTTP_X_TENANT_NAME')),
'roles': roles}
return identity

def _get_account_for_tenant(self, tenant_id):
return '%s%s' % (self.reseller_prefix, tenant_id)

def _reseller_check(self, account, tenant_id):
"""Check reseller prefix."""
return account == self._get_account_for_tenant(tenant_id)

def _authorize_cross_tenant(self, user, tenant_id, tenant_name, roles):
def _authorize_cross_tenant(self, user_id, user_name,
tenant_id, tenant_name, roles):
""" Check cross-tenant ACLs
Match tenant_id:user, tenant_name:user, and *:user.
Match tenant:user, tenant and user could be its id, name or '*'
:param user: The user name from the identity token.
:param user_id: The user id from the identity token.
:param user_name: The user name from the identity token.
:param tenant_id: The tenant ID from the identity token.
:param tenant_name: The tenant name from the identity token.
:param roles: The given container ACL.
:returns: True if tenant_id:user, tenant_name:user, or *:user matches
the given ACL. False otherwise.
:returns: matched string if tenant(name/id/*):user(name/id/*) matches
the given ACL.
None otherwise.
"""
wildcard_tenant_match = '*:%s' % (user)
tenant_id_user_match = '%s:%s' % (tenant_id, user)
tenant_name_user_match = '%s:%s' % (tenant_name, user)

return (wildcard_tenant_match in roles
or tenant_id_user_match in roles
or tenant_name_user_match in roles)
for tenant in [tenant_id, tenant_name, '*']:
for user in [user_id, user_name, '*']:
s = '%s:%s' % (tenant, user)
if s in roles:
return s
return None

def authorize(self, req):
env = req.environ
env_identity = env.get('keystone.identity', {})
tenant_id, tenant_name = env_identity.get('tenant')
user = env_identity.get('user', '')
env_identity = self._integral_keystone_identity(env)
tenant_id, tenant_name = env_identity['tenant']
user_id, user_name = env_identity['user']
referrers, roles = swift_acl.parse_acl(getattr(req, 'acl', None))

#allow OPTIONS requests to proceed as normal
Expand All @@ -187,10 +206,12 @@ def authorize(self, req):
return

# cross-tenant authorization
if self._authorize_cross_tenant(user, tenant_id, tenant_name, roles):
log_msg = 'user %s:%s, %s:%s, or *:%s allowed in ACL authorizing'
self.logger.debug(log_msg % (tenant_name, user,
tenant_id, user, user))
matched_acl = self._authorize_cross_tenant(user_id, user_name,
tenant_id, tenant_name,
roles)
if matched_acl is not None:
log_msg = 'user %s allowed in ACL authorizing.' % matched_acl
self.logger.debug(log_msg)
return

acl_authorized = self._authorize_unconfirmed_identity(req, obj,
Expand Down Expand Up @@ -219,7 +240,7 @@ def authorize(self, req):
return

# If user is of the same name of the tenant then make owner of it.
if self.is_admin and user == tenant_name:
if self.is_admin and user_name == tenant_name:
self.logger.warning("the is_admin feature has been deprecated "
"and will be removed in the future "
"update your config file")
Expand All @@ -233,7 +254,8 @@ def authorize(self, req):
for user_role in user_roles:
if user_role in (r.lower() for r in roles):
log_msg = 'user %s:%s allowed in ACL: %s authorizing'
self.logger.debug(log_msg % (tenant_name, user, user_role))
self.logger.debug(log_msg % (tenant_name, user_name,
user_role))
return

return self.denied_response(req)
Expand Down
79 changes: 54 additions & 25 deletions test/unit/common/middleware/test_keystoneauth.py
Expand Up @@ -164,13 +164,18 @@ def _make_request(self, path, **kwargs):
def _get_account(self, identity=None):
if not identity:
identity = self._get_identity()
return self.test_auth._get_account_for_tenant(identity['tenant'][0])

def _get_identity(self, tenant_id='tenant_id',
tenant_name='tenant_name', user='user', roles=None):
if not roles:
roles = []
return dict(tenant=(tenant_id, tenant_name), user=user, roles=roles)
return self.test_auth._get_account_for_tenant(identity['HTTP_X_TENANT_ID'])

def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name',
user_id='user_id', user_name='user_name', roles=[]):
if isinstance(roles, list):
roles = ','.join(roles)
return {'HTTP_X_USER_ID': user_id,
'HTTP_X_USER_NAME': user_name,
'HTTP_X_TENANT_ID': tenant_id,
'HTTP_X_TENANT_NAME': tenant_name,
'HTTP_X_ROLES': roles,
'HTTP_X_IDENTITY_STATUS': 'Confirmed'}

def _check_authenticate(self, account=None, identity=None, headers=None,
exception=None, acl=None, env=None, path=None):
Expand All @@ -180,8 +185,8 @@ def _check_authenticate(self, account=None, identity=None, headers=None,
account = self._get_account(identity)
if not path:
path = '/v1/%s/c' % account
default_env = {'keystone.identity': identity,
'REMOTE_USER': identity['tenant']}
default_env = {'REMOTE_USER': identity['HTTP_X_TENANT_ID']}
default_env.update(identity)
if env:
default_env.update(env)
req = self._make_request(path, headers=headers, environ=default_env)
Expand Down Expand Up @@ -225,8 +230,7 @@ def test_authorize_succeeds_as_owner_for_insensitive_operator_role(self):
self.assertTrue(req.environ.get('swift_owner'))

def _check_authorize_for_tenant_owner_match(self, exception=None):
identity = self._get_identity()
identity['user'] = identity['tenant'][1]
identity = self._get_identity(user_name='same_name', tenant_name='same_name')
req = self._check_authenticate(identity=identity, exception=exception)
expected = bool(exception is None)
self.assertEqual(bool(req.environ.get('swift_owner')), expected)
Expand Down Expand Up @@ -271,30 +275,55 @@ def test_authorize_succeeds_for_user_role_in_roles(self):

def test_authorize_succeeds_for_tenant_name_user_in_roles(self):
identity = self._get_identity()
acl = '%s:%s' % (identity['tenant'][1], identity['user'])
self._check_authenticate(identity=identity, acl=acl)
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
tenant_id = identity['HTTP_X_TENANT_ID']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (tenant_id, user)
self._check_authenticate(identity=identity, acl=acl)

def test_authorize_succeeds_for_tenant_id_user_in_roles(self):
identity = self._get_identity()
acl = '%s:%s' % (identity['tenant'][0], identity['user'])
self._check_authenticate(identity=identity, acl=acl)
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
tenant_name = identity['HTTP_X_TENANT_NAME']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (tenant_name, user)
self._check_authenticate(identity=identity, acl=acl)

def test_authorize_succeeds_for_wildcard_tenant_user_in_roles(self):
identity = self._get_identity()
acl = '*:%s' % (identity['user'])
self._check_authenticate(identity=identity, acl=acl)
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
for user in [user_id, user_name, '*']:
acl = '*:%s' % user
self._check_authenticate(identity=identity, acl=acl)

def test_cross_tenant_authorization_success(self):
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantID:userA']))
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantNAME:userA']))
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['*:userA']))
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantID:userA']), 'tenantID:userA')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantNAME:userA']), 'tenantNAME:userA')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['*:userA']), '*:userA')

self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantID:userID']), 'tenantID:userID')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantNAME:userID']), 'tenantNAME:userID')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['*:userID']), '*:userID')

self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantID:*']), 'tenantID:*')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantNAME:*']), 'tenantNAME:*')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['*:*']), '*:*')

def test_cross_tenant_authorization_failure(self):
self.assertFalse(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantXYZ:userA']))
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantXYZ:userA']), None)


if __name__ == '__main__':
Expand Down

0 comments on commit 58a095b

Please sign in to comment.