Skip to content

Commit

Permalink
Improved legacy tenancy resolution (bug 951933)
Browse files Browse the repository at this point in the history
Change-Id: Ia6fd5eb57e8d7f90328117351f7b814b1b4495dc
  • Loading branch information
dolph committed Mar 13, 2012
1 parent 1e07b98 commit 73af033
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 60 deletions.
33 changes: 23 additions & 10 deletions keystone/middleware/auth_token.py
Expand Up @@ -90,7 +90,6 @@
"""

import datetime
import httplib
import json
import logging
Expand Down Expand Up @@ -392,15 +391,29 @@ def _build_user_headers(self, token_info):
token = token_info['access']['token']
roles = ','.join([role['name'] for role in user.get('roles', [])])

# FIXME(ja): I think we are checking in both places because:
# tenant might not be returned, and there was a pre-release
# that put tenant objects inside the user object?
try:
tenant_id = token['tenant']['id']
tenant_name = token['tenant']['name']
except:
tenant_id = user.get('tenantId')
tenant_name = user.get('tenantName')
def get_tenant_info():
"""Returns a (tenant_id, tenant_name) tuple from context."""
def essex():
"""Essex puts the tenant ID and name on the token."""
return (token['tenant']['id'], token['tenant']['name'])

def pre_diablo():
"""Pre-diablo, Keystone only provided tenantId."""
return (token['tenantId'], token['tenantId'])

def default_tenant():
"""Assume the user's default tenant."""
return (user['tenantId'], user['tenantName'])

for method in [essex, pre_diablo, default_tenant]:
try:
return method()
except KeyError:
pass

raise InvalidUserToken('Unable to determine tenancy.')

tenant_id, tenant_name = get_tenant_info()

user_id = user['id']
user_name = user['username']
Expand Down
201 changes: 151 additions & 50 deletions tests/test_auth_token_middleware.py
Expand Up @@ -23,24 +23,76 @@
from keystone.middleware import auth_token
from keystone import test

DATA = {
'access': {
'token': {
'id': 'token1',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',

# JSON responses keyed by token ID
TOKEN_RESPONSES = {
'valid-token': {
'access': {
'token': {
'id': 'valid-token',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'username': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
'user': {
'id': 'user_id1',
'username': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'default-tenant-token': {
'access': {
'token': {
'id': 'default-tenant-token',
},
'user': {
'id': 'user_id1',
'username': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
'valid-diablo-token': {
'access': {
'token': {
'id': 'valid-diablo-token',
'tenantId': 'tenant_id1',
},
'user': {
'id': 'user_id1',
'username': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
'unscoped-token': {
'access': {
'token': {
'id': 'unscoped-token',
},
'user': {
'id': 'user_id1',
'username': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
}
}


Expand All @@ -51,7 +103,7 @@ def __init__(self):
self.token_expiration = None

def get(self, key):
data = DATA.copy()
data = TOKEN_RESPONSES['valid-token'].copy()
if not data or key != "tokens/%s" % (data['access']['token']['id']):
return
if not self.token_expiration:
Expand Down Expand Up @@ -82,7 +134,7 @@ def __init__(self, *args):
pass

def request(self, method, path, **kwargs):
"""Fakes out several http responses
"""Fakes out several http responses.
If a POST request is made, we assume the calling code is trying
to get a new admin token.
Expand All @@ -102,12 +154,12 @@ def request(self, method, path, **kwargs):

else:
token_id = path.rsplit('/', 1)[1]
if token_id == 'token1':
if token_id in TOKEN_RESPONSES.keys():
status = 200
body = json.dumps(DATA)
body = json.dumps(TOKEN_RESPONSES[token_id])
else:
status = 404
body = ''
body = str()

self.resp = FakeHTTPResponse(status, body)

Expand All @@ -117,58 +169,107 @@ def getresponse(self):
def close(self):
pass

class FakeApp(object):
"""This represents a WSGI app protected by the auth_token middleware.
"""
def __init__(self, expected_env=None):
expected_env = expected_env or {}
self.expected_env = {
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
'HTTP_X_TENANT_ID': 'tenant_id1',
'HTTP_X_TENANT_NAME': 'tenant_name1',
'HTTP_X_USER_ID': 'user_id1',
'HTTP_X_USER_NAME': 'user_name1',
'HTTP_X_ROLES': 'role1,role2',
'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
}
self.expected_env.update(expected_env)

def __call__(self, env, start_response):
for k, v in self.expected_env.items():
assert env[k] == v, '%s != %s' % (env[k], v)

resp = webob.Response()
resp.body = 'SUCCESS'
return resp(env, start_response)


class BaseAuthTokenMiddlewareTest(test.TestCase):
def setUp(self, expected_env=None):
expected_env = expected_env or {}

class AuthTokenMiddlewareTest(test.TestCase):
def setUp(self):
super(AuthTokenMiddlewareTest, self).setUp()
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
}

# This object represents a wsgi app that would be wrapped with
# the auth_token middleware
def fake_app(env, start_response):
expected_env = {
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
'HTTP_X_TENANT_ID': 'tenant_id1',
'HTTP_X_TENANT_NAME': 'tenant_name1',
'HTTP_X_USER_ID': 'user_id1',
'HTTP_X_USER_NAME': 'user_name1',
'HTTP_X_ROLES': 'role1,role2',
'HTTP_X_USER': 'user_name1',
'HTTP_X_TENANT': 'tenant_name1',
'HTTP_X_ROLE': 'role1,role2',
}
for k, v in expected_env.items():
self.assertEqual(env[k], v)

resp = webob.Response()
resp.body = 'SUCCESS'
return resp(env, start_response)

self.middleware = auth_token.AuthProtocol(fake_app, conf)
self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
self.middleware.http_client_class = FakeHTTPConnection
self.middleware._iso8601 = iso8601

self.response_status = None
self.response_headers = None
super(BaseAuthTokenMiddlewareTest, self).setUp()

def start_fake_response(self, status, headers):
self.response_status = int(status.split(' ', 1)[0])
self.response_headers = dict(headers)

def test_request(self):

class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""Auth Token middleware should understand Diablo keystone responses."""
def setUp(self):
# pre-diablo only had Tenant ID, which was also the Name
expected_env = {
'HTTP_X_TENANT_ID': 'tenant_id1',
'HTTP_X_TENANT_NAME': 'tenant_id1',
'HTTP_X_TENANT': 'tenant_id1', # now deprecated (diablo-compat)
}
super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)

def test_diablo_response(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'token1'
req.headers['X-Auth-Token'] = 'valid-diablo-token'
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual(body, ['SUCCESS'])

class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def test_valid_request(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'valid-token'
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual(body, ['SUCCESS'])

def test_default_tenant_token(self):
"""Unscoped requests with a default tenant should "auto-scope."
The implied scope is the user's tenant ID.
"""
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'default-tenant-token'
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual(body, ['SUCCESS'])

def test_unscoped_token(self):
"""Unscoped requests with no default tenant ID should be rejected."""
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'unscoped-token'
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
'Keystone uri=\'https://keystone.example.com:1234\'')

def test_request_invalid_token(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'token2'
req.headers['X-Auth-Token'] = 'invalid-token'
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
Expand All @@ -191,21 +292,21 @@ def test_request_blank_token(self):

def test_memcache(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'token1'
req.headers['X-Auth-Token'] = 'valid-token'
self.middleware._cache = FakeMemcache()
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.middleware._cache.set_value, None)

def test_memcache_set_invalid(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'token2'
req.headers['X-Auth-Token'] = 'invalid-token'
self.middleware._cache = FakeMemcache()
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.middleware._cache.set_value, "invalid")

def test_memcache_set_expired(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'token1'
req.headers['X-Auth-Token'] = 'valid-token'
self.middleware._cache = FakeMemcache()
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
Expand Down

0 comments on commit 73af033

Please sign in to comment.