Skip to content

Commit

Permalink
Add policy enforcment for v2 api.
Browse files Browse the repository at this point in the history
This patch adds policy checks in the images controller for all actions that need
to be enforced. It also fixed an issue with unit tests where the v1 unitests
were testing less than they should be. This was due to the default policy being
overridden in the test and causing a forbidden to be raised whenever policys
were changed, regardless of the policy rule added.

Fixes bug 1036846

Change-Id: Ib351d8c1e13164d02b02685c808b30045f7850ea
  • Loading branch information
ameade authored and bcwaldon committed Aug 15, 2012
1 parent 2580d3b commit e7073d0
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 2 deletions.
22 changes: 21 additions & 1 deletion glance/api/v2/images.py
Expand Up @@ -20,6 +20,7 @@

import webob.exc

from glance.api import policy
import glance.api.v2 as v2
from glance.common import exception
from glance.common import utils
Expand All @@ -37,9 +38,17 @@


class ImagesController(object):
def __init__(self, db_api=None):
def __init__(self, db_api=None, policy_enforcer=None):
self.db_api = db_api or glance.db.get_api()
self.db_api.configure_db()
self.policy = policy_enforcer or policy.Enforcer()

def _enforce(self, req, action):
"""Authorize an action against our policies"""
try:
self.policy.enforce(req.context, action, {})
except exception.Forbidden:
raise webob.exc.HTTPForbidden()

def _normalize_properties(self, image):
"""Convert the properties from the stored format to a dict
Expand Down Expand Up @@ -68,6 +77,10 @@ def _append_tags(self, context, image):

@utils.mutating
def create(self, req, image):
self._enforce(req, 'add_image')
is_public = image.get('is_public')
if is_public:
self._enforce(req, 'publicize_image')
image['owner'] = req.context.owner
image['status'] = 'queued'

Expand All @@ -87,6 +100,7 @@ def create(self, req, image):

def index(self, req, marker=None, limit=None, sort_key='created_at',
sort_dir='desc', filters={}):
self._enforce(req, 'get_images')
filters['deleted'] = False
#NOTE(bcwaldon): is_public=True gets public images and those
# owned by the authenticated tenant
Expand Down Expand Up @@ -121,12 +135,17 @@ def _get_image(self, context, image_id):
raise webob.exc.HTTPNotFound()

def show(self, req, image_id):
self._enforce(req, 'get_image')
image = self._get_image(req.context, image_id)
image = self._normalize_properties(dict(image))
return self._append_tags(req.context, image)

@utils.mutating
def update(self, req, image_id, image):
self._enforce(req, 'modify_image')
is_public = image.get('is_public')
if is_public:
self._enforce(req, 'publicize_image')
tags = self._extract_tags(image)

try:
Expand All @@ -148,6 +167,7 @@ def update(self, req, image_id, image):

@utils.mutating
def delete(self, req, image_id):
self._enforce(req, 'delete_image')
image = self._get_image(req.context, image_id)

if image['protected']:
Expand Down
13 changes: 13 additions & 0 deletions glance/tests/unit/utils.py
Expand Up @@ -108,3 +108,16 @@ def add_to_backend(self, context, scheme, image_id, data, size):
self.data[image_id] = (data, size or len(data))
checksum = 'Z'
return (image_id, size, checksum)


class FakePolicyEnforcer(object):
def __init__(self, *_args, **kwargs):
self.rules = {}

def enforce(self, _ctxt, action, _target, **kwargs):
"""Raise Forbidden if a rule for given action is set to false."""
if self.rules.get(action) is False:
raise exception.Forbidden()

def set_rules(self, rules):
self.rules = rules
89 changes: 88 additions & 1 deletion glance/tests/unit/v2/test_images_resource.py
Expand Up @@ -22,6 +22,7 @@
from glance.common import utils
from glance.openstack.common import cfg
import glance.schema
from glance.tests.unit import base
import glance.tests.unit.utils as unit_test_utils
import glance.tests.utils as test_utils
import glance.store
Expand Down Expand Up @@ -73,8 +74,10 @@ class TestImagesController(test_utils.BaseTestCase):
def setUp(self):
super(TestImagesController, self).setUp()
self.db = unit_test_utils.FakeDB()
self.policy = unit_test_utils.FakePolicyEnforcer()
self._create_images()
self.controller = glance.api.v2.images.ImagesController(self.db)
self.controller = glance.api.v2.images.ImagesController(self.db,
self.policy)
glance.store.create_stores()

def _create_images(self):
Expand Down Expand Up @@ -337,13 +340,97 @@ def test_update_duplicate_tags(self):
output = self.controller.update(request, UUID1, image)
self.assertEqual(['ping'], output['tags'])

def test_delete(self):
request = unit_test_utils.get_fake_request()
try:
self.controller.delete(request, UUID1)
except Exception as e:
self.fail("Delete raised exception: %s" % e)

def test_delete_non_existant(self):
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
request, utils.generate_uuid())

def test_index_with_invalid_marker(self):
fake_uuid = utils.generate_uuid()
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, request, marker=fake_uuid)


class TestImagesControllerPolicies(base.IsolatedUnitTest):

def setUp(self):
super(TestImagesControllerPolicies, self).setUp()
self.db = unit_test_utils.FakeDB()
self.policy = unit_test_utils.FakePolicyEnforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
self.policy)

def test_index_unauthorized(self):
rules = {"get_images": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden, self.controller.index,
request)

def test_show_unauthorized(self):
rules = {"get_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden, self.controller.show,
request, image_id=UUID2)

def test_create_public_image_unauthorized(self):
rules = {"publicize_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
image = {'name': 'image-1', 'is_public': True}
self.assertRaises(webob.exc.HTTPForbidden, self.controller.create,
request, image)

def test_update_unauthorized(self):
rules = {"modify_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
image = {'name': 'image-2'}
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
request, UUID1, image)

def test_update_publicize_image_unauthorized(self):
rules = {"publicize_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
image = {'name': 'image-1', 'is_public': True}
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
request, UUID1, image)

def test_update_public_image_unauthorized(self):
rules = {"modify_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
image = {'name': 'image-1', 'is_public': True}
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
request, UUID1, image)

def test_update_public_image_unauthorized_but_not_publicizing(self):
rules = {"publicize_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
image = {'name': 'image-2', 'is_public': False}
output = self.controller.update(request, UUID1, image)
self.assertEqual(UUID1, output['id'])
self.assertEqual('image-2', output['name'])

def test_delete_unauthorized(self):
rules = {"delete_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden, self.controller.delete,
request, UUID1)


class TestImagesDeserializer(test_utils.BaseTestCase):

def setUp(self):
Expand Down

0 comments on commit e7073d0

Please sign in to comment.