Skip to content

Commit

Permalink
Avoid leaking admin-ness into combination alarms
Browse files Browse the repository at this point in the history
Previously when an admin created a combination alarm on
behalf of an non-admin identity, this had the effect of leaking
visibility onto alarms that would not normally
be visible to the non-admin tenant.

Now we validate all alarm ids with the project ID of the non-admin
identity that will ultimately own the alarm instead of the project ID
of the API caller.

Fixes bug #1237632

Change-Id: I5d1cf41c9182f09bc37b93deb14dda58f1d6dcd6
(cherry picked from commit a8e93dd)
  • Loading branch information
Mehdi Abaakouk authored and Eoghan Glynn committed Oct 12, 2013
1 parent 58c85d7 commit 3004a0f
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 18 deletions.
48 changes: 30 additions & 18 deletions ceilometer/api/controllers/v2.py
Expand Up @@ -276,24 +276,34 @@ def __init__(self, id):
_("Not Authorized to access project %s") % id)


def _get_auth_project(on_behalf_of=None):
# when an alarm is created by an admin on behalf of another tenant
# we must ensure for:
# - threshold alarm, that an implicit query constraint on project_id is
# added so that admin-level visibility on statistics is not leaked
# - combination alarm, that alarm ids verification is scoped to
# alarms owned by the alarm project.
# hence for null auth_project (indicating admin-ness) we check if
# the creating tenant differs from the tenant on whose behalf the
# alarm is being created
auth_project = acl.get_limited_to_project(pecan.request.headers)
created_by = pecan.request.headers.get('X-Project-Id')
is_admin = auth_project is None

if is_admin and on_behalf_of != created_by:
auth_project = on_behalf_of
return auth_project


def _sanitize_query(query, db_func, on_behalf_of=None):
'''Check the query to see if:
1) the request is coming from admin - then allow full visibility
2) non-admin - make sure that the query includes the requester's
project.
'''
q = copy.copy(query)
auth_project = acl.get_limited_to_project(pecan.request.headers)
created_by = pecan.request.headers.get('X-Project-Id')

# when an alarm is created by an admin on behalf of another tenant
# we must ensure that an implicit query constraint on project_id is
# added so that admin-level visibility on statistics is not leaked,
# hence for null auth_project (indicating admin-ness) we check if
# the creating tenant differs from the tenant on whose behalf the
# alarm is being created
auth_project = (auth_project or
(on_behalf_of if on_behalf_of != created_by else None))
auth_project = _get_auth_project(on_behalf_of)
if auth_project:
_verify_query_segregation(q, auth_project)

Expand Down Expand Up @@ -1073,14 +1083,6 @@ def validate(combination_rule):
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))

for id in combination_rule.alarm_ids:
auth_project = acl.get_limited_to_project(pecan.request.headers)
alarms = list(pecan.request.storage_conn.get_alarms(
alarm_id=id, project=auth_project))
if len(alarms) < 1:
error = _("Alarm %s doesn't exists") % id
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))
return combination_rule


Expand Down Expand Up @@ -1170,6 +1172,7 @@ def validate(alarm):
error = _("%s is mandatory") % field
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))

if alarm.threshold_rule:
# ensure an implicit constraint on project_id is added to
# the query if not already present
Expand All @@ -1178,6 +1181,15 @@ def validate(alarm):
storage.SampleFilter.__init__,
on_behalf_of=alarm.project_id
)
elif alarm.combination_rule:
auth_project = _get_auth_project(alarm.project_id)
for id in alarm.combination_rule.alarm_ids:
alarms = list(pecan.request.storage_conn.get_alarms(
alarm_id=id, project=auth_project))
if not alarms:
error = _("Alarm %s doesn't exist") % id
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))

if alarm.threshold_rule and alarm.combination_rule:
error = _("threshold_rule and combination_rule "
Expand Down
97 changes: 97 additions & 0 deletions tests/api/v2/test_alarm_scenarios.py
Expand Up @@ -645,7 +645,104 @@ def test_post_alarm_combination(self):
else:
self.fail("Alarm not found")

def test_post_combination_alarm_as_user_with_unauthorized_alarm(self):
"""Test that post a combination alarm as normal user/project
with a alarm_id unauthorized for this project/user
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'combination',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'combination_rule': {
'alarm_ids': ['a',
'b'],
'operator': 'and',
}
}
an_other_user_auth = {'X-User-Id': str(uuid.uuid4()),
'X-Project-Id': str(uuid.uuid4())}
resp = self.post_json('/alarms', params=json, status=400,
headers=an_other_user_auth)
self.assertEqual(jsonutils.loads(resp.body)['error_message']
['faultstring'],
"Alarm a doesn't exist")

def test_post_combination_alarm_as_admin_1(self):
"""Test that post a combination alarm as admin on behalf of a other
user/project with a alarm_id unauthorized for this project/user
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'user_id': 'auseridthatisnotmine',
'project_id': 'aprojectidthatisnotmine',
'type': 'combination',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'combination_rule': {
'alarm_ids': ['a',
'b'],
'operator': 'and',
}
}

headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
resp = self.post_json('/alarms', params=json, status=400,
headers=headers)
self.assertEqual(jsonutils.loads(resp.body)['error_message']
['faultstring'],
"Alarm a doesn't exist")

def test_post_combination_alarm_as_admin_2(self):
"""Test that post a combination alarm as admin on behalf of nobody
with a alarm_id of someone else
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'combination',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'combination_rule': {
'alarm_ids': ['a',
'b'],
'operator': 'and',
}
}

headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
self.post_json('/alarms', params=json, status=201,
headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
if alarms[0].name == 'added_alarm':
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarms[0], storage_key),
json[key])
else:
self.fail("Alarm not found")

def test_post_invalid_alarm_combination(self):
"""Test that post a combination alarm with a not existing alarm id
"""
json = {
'enabled': False,
'name': 'added_alarm',
Expand Down

0 comments on commit 3004a0f

Please sign in to comment.