Skip to content

Commit

Permalink
Use function registration for policy checks
Browse files Browse the repository at this point in the history
The original policy framework allowed new policy checks to be created
through inheritance.  This is somewhat clunky and unnecessary in
Python.  This change refactors policy.py to allow new policy checks
to be registered using an @register() decorator.  One consequence is
that HttpBrain is deprecated.

Care has been taken to ensure backwards compatibility; deprecation
warnings will be emitted for uses of HttpBrain or the inheritance-
based checks.

(Pull-up from openstack-common, with deprecation of HttpBrain.)

Change-Id: Ia9a6039a82fe4ebfa9b18e5eb93c21fffee90f09
  • Loading branch information
Kevin L. Mitchell committed Aug 2, 2012
1 parent 8583ce6 commit 751270f
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 47 deletions.
152 changes: 106 additions & 46 deletions nova/openstack/common/policy.py
Expand Up @@ -131,13 +131,25 @@ def enforce(match_list, target_dict, credentials_dict, exc=None,

class Brain(object):
"""Implements policy checking."""

_checks = {}

@classmethod
def _register(cls, name, func):
cls._checks[name] = func

@classmethod
def load_json(cls, data, default_rule=None):
"""Init a brain using json instead of a rules dictionary."""
rules_dict = jsonutils.loads(data)
return cls(rules=rules_dict, default_rule=default_rule)

def __init__(self, rules=None, default_rule=None):
if self.__class__ != Brain:
LOG.warning(_("Inheritance-based rules are deprecated; use "
"the default brain instead of %s.") %
self.__class__.__name__)

self.rules = rules or {}
self.default_rule = default_rule

Expand All @@ -151,15 +163,24 @@ def _check(self, match, target_dict, cred_dict):
LOG.exception(_("Failed to understand rule %(match)r") % locals())
# If the rule is invalid, fail closed
return False

func = None
try:
f = getattr(self, '_check_%s' % match_kind)
old_func = getattr(self, '_check_%s' % match_kind)
except AttributeError:
if not self._check_generic(match, target_dict, cred_dict):
return False
func = self._checks.get(match_kind, self._checks.get(None, None))
else:
if not f(match_value, target_dict, cred_dict):
return False
return True
LOG.warning(_("Inheritance-based rules are deprecated; update "
"_check_%s") % match_kind)
func = (lambda brain, kind, value, target, cred:
old_func(value, target, cred))

if not func:
LOG.error(_("No handler for matches of kind %s") % match_kind)
# Fail closed
return False

return func(self, match_kind, match_value, target_dict, cred_dict)

def check(self, match_list, target_dict, cred_dict):
"""Checks authorization of some rules against credentials.
Expand All @@ -183,58 +204,97 @@ def check(self, match_list, target_dict, cred_dict):
return True
return False

def _check_rule(self, match, target_dict, cred_dict):
"""Recursively checks credentials based on the brains rules."""
try:
new_match_list = self.rules[match]
except KeyError:
if self.default_rule and match != self.default_rule:
new_match_list = ('rule:%s' % self.default_rule,)
else:
return False

return self.check(new_match_list, target_dict, cred_dict)
class HttpBrain(Brain):
"""A brain that can check external urls for policy.
def _check_role(self, match, target_dict, cred_dict):
"""Check that there is a matching role in the cred dict."""
return match.lower() in [x.lower() for x in cred_dict['roles']]
Posts json blobs for target and credentials.
def _check_generic(self, match, target_dict, cred_dict):
"""Check an individual match.
Note that this brain is deprecated; the http check is registered
by default.
"""

Matches look like:
pass

tenant:%(tenant_id)s
role:compute:admin

"""
def register(name, func=None):
"""
Register a function as a policy check.
:param name: Gives the name of the check type, e.g., 'rule',
'role', etc. If name is None, a default function
will be registered.
:param func: If given, provides the function to register. If not
given, returns a function taking one argument to
specify the function to register, allowing use as a
decorator.
"""

# TODO(termie): do dict inspection via dot syntax
match = match % target_dict
key, value = match.split(':', 1)
if key in cred_dict:
return value == cred_dict[key]
return False
# Perform the actual decoration by registering the function.
# Returns the function for compliance with the decorator
# interface.
def decorator(func):
# Register the function
Brain._register(name, func)
return func

# If the function is given, do the registration
if func:
return decorator(func)

return decorator


@register("rule")
def _check_rule(brain, match_kind, match, target_dict, cred_dict):
"""Recursively checks credentials based on the brains rules."""
try:
new_match_list = brain.rules[match]
except KeyError:
if brain.default_rule and match != brain.default_rule:
new_match_list = ('rule:%s' % brain.default_rule,)
else:
return False

return brain.check(new_match_list, target_dict, cred_dict)

class HttpBrain(Brain):
"""A brain that can check external urls for policy.

Posts json blobs for target and credentials.
@register("role")
def _check_role(brain, match_kind, match, target_dict, cred_dict):
"""Check that there is a matching role in the cred dict."""
return match.lower() in [x.lower() for x in cred_dict['roles']]


@register('http')
def _check_http(brain, match_kind, match, target_dict, cred_dict):
"""Check http: rules by calling to a remote server.
This example implementation simply verifies that the response is
exactly 'True'. A custom brain using response codes could easily
be implemented.
"""
url = 'http:' + (match % target_dict)
data = {'target': jsonutils.dumps(target_dict),
'credentials': jsonutils.dumps(cred_dict)}
post_data = urllib.urlencode(data)
f = urllib2.urlopen(url, post_data)
return f.read() == "True"

def _check_http(self, match, target_dict, cred_dict):
"""Check http: rules by calling to a remote server.

This example implementation simply verifies that the response is
exactly 'True'. A custom brain using response codes could easily
be implemented.
@register(None)
def _check_generic(brain, match_kind, match, target_dict, cred_dict):
"""Check an individual match.
"""
url = match % target_dict
data = {'target': jsonutils.dumps(target_dict),
'credentials': jsonutils.dumps(cred_dict)}
post_data = urllib.urlencode(data)
f = urllib2.urlopen(url, post_data)
return f.read() == "True"
Matches look like:
tenant:%(tenant_id)s
role:compute:admin
"""

# TODO(termie): do dict inspection via dot syntax
match = match % target_dict
if match_kind in cred_dict:
return match == cred_dict[match_kind]
return False
2 changes: 1 addition & 1 deletion nova/policy.py
Expand Up @@ -65,7 +65,7 @@ def init():

def _set_brain(data):
default_rule = FLAGS.policy_default_rule
policy.set_brain(policy.HttpBrain.load_json(data, default_rule))
policy.set_brain(policy.Brain.load_json(data, default_rule))


def enforce(context, action, target):
Expand Down

0 comments on commit 751270f

Please sign in to comment.