diff --git a/keystone/service.py b/keystone/service.py index c08898621c..9799e3a75c 100644 --- a/keystone/service.py +++ b/keystone/service.py @@ -490,20 +490,13 @@ def _get_token_ref(self, context, token_id, belongs_to=None): """ # TODO(termie): this stuff should probably be moved to middleware self.assert_admin(context) + data = self.token_api.get_token(context=context, token_id=token_id) + if belongs_to: + if (not data.get('tenant') or data['tenant'].get('id') != + belongs_to): + raise exception.Unauthorized() - if cms.is_ans1_token(token_id): - data = json.loads(cms.cms_verify(cms.token_to_cms(token_id), - config.CONF.signing.certfile, - config.CONF.signing.ca_certs)) - data['access']['token']['user'] = data['access']['user'] - data['access']['token']['metadata'] = data['access']['metadata'] - if belongs_to: - assert data['access']['token']['tenant']['id'] == belongs_to - token_ref = data['access']['token'] - else: - token_ref = self.token_api.get_token(context=context, - token_id=token_id) - return token_ref + return data # admin only def validate_token_head(self, context, token_id): diff --git a/tests/test_service.py b/tests/test_service.py index f48bd9a998..487e5ac130 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -150,3 +150,54 @@ def test_authenticate_password_too_large(self): body_dict = _build_user_auth(username='FOO', password='0' * 8193) self.assertRaises(exception.ValidationSizeError, self.api.authenticate, {}, body_dict) + + +class AuthWithToken(AuthTest): + def setUp(self): + super(AuthWithToken, self).setUp() + + def test_belongs_to_no_tenant(self): + r = self.api.authenticate( + {}, + auth={ + 'passwordCredentials': { + 'username': self.user_foo['name'], + 'password': self.user_foo['password'] + } + }) + unscoped_token_id = r['access']['token']['id'] + self.assertRaises( + exception.Unauthorized, + self.api.validate_token, + dict(is_admin=True, query_string={'belongsTo': 'BAR'}), + token_id=unscoped_token_id) + + def test_belongs_to_wrong_tenant(self): + body_dict = _build_user_auth( + username='FOO', + password='foo2', + tenant_name="BAR") + + scoped_token = self.api.authenticate({}, body_dict) + scoped_token_id = scoped_token['access']['token']['id'] + + self.assertRaises( + exception.Unauthorized, + self.api.validate_token, + dict(is_admin=True, query_string={'belongsTo': 'me'}), + token_id=scoped_token_id) + + def test_belongs_to(self): + body_dict = _build_user_auth( + username='FOO', + password='foo2', + tenant_name="BAR") + + scoped_token = self.api.authenticate({}, body_dict) + scoped_token_id = scoped_token['access']['token']['id'] + + self.assertRaises( + exception.Unauthorized, + self.api.validate_token, + dict(is_admin=True, query_string={'belongsTo': 'BAR'}), + token_id=scoped_token_id)