From e7073d028aa4c5221dba16b3d48a43189067fed9 Mon Sep 17 00:00:00 2001 From: Alex Meade Date: Wed, 15 Aug 2012 12:27:00 -0400 Subject: [PATCH] Add policy enforcment for v2 api. This patch adds policy checks in the images controller for all actions that need to be enforced. It also fixed an issue with unit tests where the v1 unitests were testing less than they should be. This was due to the default policy being overridden in the test and causing a forbidden to be raised whenever policys were changed, regardless of the policy rule added. Fixes bug 1036846 Change-Id: Ib351d8c1e13164d02b02685c808b30045f7850ea --- glance/api/v2/images.py | 22 ++++- glance/tests/unit/utils.py | 13 +++ glance/tests/unit/v2/test_images_resource.py | 89 +++++++++++++++++++- 3 files changed, 122 insertions(+), 2 deletions(-) diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 4b67db412d..cc629be97b 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -20,6 +20,7 @@ import webob.exc +from glance.api import policy import glance.api.v2 as v2 from glance.common import exception from glance.common import utils @@ -37,9 +38,17 @@ class ImagesController(object): - def __init__(self, db_api=None): + def __init__(self, db_api=None, policy_enforcer=None): self.db_api = db_api or glance.db.get_api() self.db_api.configure_db() + self.policy = policy_enforcer or policy.Enforcer() + + def _enforce(self, req, action): + """Authorize an action against our policies""" + try: + self.policy.enforce(req.context, action, {}) + except exception.Forbidden: + raise webob.exc.HTTPForbidden() def _normalize_properties(self, image): """Convert the properties from the stored format to a dict @@ -68,6 +77,10 @@ def _append_tags(self, context, image): @utils.mutating def create(self, req, image): + self._enforce(req, 'add_image') + is_public = image.get('is_public') + if is_public: + self._enforce(req, 'publicize_image') image['owner'] = req.context.owner image['status'] = 'queued' @@ -87,6 +100,7 @@ def create(self, req, image): def index(self, req, marker=None, limit=None, sort_key='created_at', sort_dir='desc', filters={}): + self._enforce(req, 'get_images') filters['deleted'] = False #NOTE(bcwaldon): is_public=True gets public images and those # owned by the authenticated tenant @@ -121,12 +135,17 @@ def _get_image(self, context, image_id): raise webob.exc.HTTPNotFound() def show(self, req, image_id): + self._enforce(req, 'get_image') image = self._get_image(req.context, image_id) image = self._normalize_properties(dict(image)) return self._append_tags(req.context, image) @utils.mutating def update(self, req, image_id, image): + self._enforce(req, 'modify_image') + is_public = image.get('is_public') + if is_public: + self._enforce(req, 'publicize_image') tags = self._extract_tags(image) try: @@ -148,6 +167,7 @@ def update(self, req, image_id, image): @utils.mutating def delete(self, req, image_id): + self._enforce(req, 'delete_image') image = self._get_image(req.context, image_id) if image['protected']: diff --git a/glance/tests/unit/utils.py b/glance/tests/unit/utils.py index 4f5b5d225c..5367531a87 100644 --- a/glance/tests/unit/utils.py +++ b/glance/tests/unit/utils.py @@ -108,3 +108,16 @@ def add_to_backend(self, context, scheme, image_id, data, size): self.data[image_id] = (data, size or len(data)) checksum = 'Z' return (image_id, size, checksum) + + +class FakePolicyEnforcer(object): + def __init__(self, *_args, **kwargs): + self.rules = {} + + def enforce(self, _ctxt, action, _target, **kwargs): + """Raise Forbidden if a rule for given action is set to false.""" + if self.rules.get(action) is False: + raise exception.Forbidden() + + def set_rules(self, rules): + self.rules = rules diff --git a/glance/tests/unit/v2/test_images_resource.py b/glance/tests/unit/v2/test_images_resource.py index 4f9018bc7e..8c226d8a10 100644 --- a/glance/tests/unit/v2/test_images_resource.py +++ b/glance/tests/unit/v2/test_images_resource.py @@ -22,6 +22,7 @@ from glance.common import utils from glance.openstack.common import cfg import glance.schema +from glance.tests.unit import base import glance.tests.unit.utils as unit_test_utils import glance.tests.utils as test_utils import glance.store @@ -73,8 +74,10 @@ class TestImagesController(test_utils.BaseTestCase): def setUp(self): super(TestImagesController, self).setUp() self.db = unit_test_utils.FakeDB() + self.policy = unit_test_utils.FakePolicyEnforcer() self._create_images() - self.controller = glance.api.v2.images.ImagesController(self.db) + self.controller = glance.api.v2.images.ImagesController(self.db, + self.policy) glance.store.create_stores() def _create_images(self): @@ -337,6 +340,18 @@ def test_update_duplicate_tags(self): output = self.controller.update(request, UUID1, image) self.assertEqual(['ping'], output['tags']) + def test_delete(self): + request = unit_test_utils.get_fake_request() + try: + self.controller.delete(request, UUID1) + except Exception as e: + self.fail("Delete raised exception: %s" % e) + + def test_delete_non_existant(self): + request = unit_test_utils.get_fake_request() + self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete, + request, utils.generate_uuid()) + def test_index_with_invalid_marker(self): fake_uuid = utils.generate_uuid() request = unit_test_utils.get_fake_request() @@ -344,6 +359,78 @@ def test_index_with_invalid_marker(self): self.controller.index, request, marker=fake_uuid) +class TestImagesControllerPolicies(base.IsolatedUnitTest): + + def setUp(self): + super(TestImagesControllerPolicies, self).setUp() + self.db = unit_test_utils.FakeDB() + self.policy = unit_test_utils.FakePolicyEnforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + self.policy) + + def test_index_unauthorized(self): + rules = {"get_images": False} + self.policy.set_rules(rules) + request = unit_test_utils.get_fake_request() + self.assertRaises(webob.exc.HTTPForbidden, self.controller.index, + request) + + def test_show_unauthorized(self): + rules = {"get_image": False} + self.policy.set_rules(rules) + request = unit_test_utils.get_fake_request() + self.assertRaises(webob.exc.HTTPForbidden, self.controller.show, + request, image_id=UUID2) + + def test_create_public_image_unauthorized(self): + rules = {"publicize_image": False} + self.policy.set_rules(rules) + request = unit_test_utils.get_fake_request() + image = {'name': 'image-1', 'is_public': True} + self.assertRaises(webob.exc.HTTPForbidden, self.controller.create, + request, image) + + def test_update_unauthorized(self): + rules = {"modify_image": False} + self.policy.set_rules(rules) + request = unit_test_utils.get_fake_request() + image = {'name': 'image-2'} + self.assertRaises(webob.exc.HTTPForbidden, self.controller.update, + request, UUID1, image) + + def test_update_publicize_image_unauthorized(self): + rules = {"publicize_image": False} + self.policy.set_rules(rules) + request = unit_test_utils.get_fake_request() + image = {'name': 'image-1', 'is_public': True} + self.assertRaises(webob.exc.HTTPForbidden, self.controller.update, + request, UUID1, image) + + def test_update_public_image_unauthorized(self): + rules = {"modify_image": False} + self.policy.set_rules(rules) + request = unit_test_utils.get_fake_request() + image = {'name': 'image-1', 'is_public': True} + self.assertRaises(webob.exc.HTTPForbidden, self.controller.update, + request, UUID1, image) + + def test_update_public_image_unauthorized_but_not_publicizing(self): + rules = {"publicize_image": False} + self.policy.set_rules(rules) + request = unit_test_utils.get_fake_request() + image = {'name': 'image-2', 'is_public': False} + output = self.controller.update(request, UUID1, image) + self.assertEqual(UUID1, output['id']) + self.assertEqual('image-2', output['name']) + + def test_delete_unauthorized(self): + rules = {"delete_image": False} + self.policy.set_rules(rules) + request = unit_test_utils.get_fake_request() + self.assertRaises(webob.exc.HTTPForbidden, self.controller.delete, + request, UUID1) + + class TestImagesDeserializer(test_utils.BaseTestCase): def setUp(self):