Skip to content

Commit

Permalink
api: allow alarm creation for others project by admins
Browse files Browse the repository at this point in the history
Change-Id: I8da7e81da799b5cbd6b1ce158ac10e12567aa3d7
Fixes-Bug: #1226470
  • Loading branch information
jd committed Oct 1, 2013
1 parent 70c4256 commit f7cb494
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 7 deletions.
23 changes: 20 additions & 3 deletions ceilometer/api/acl.py
Expand Up @@ -44,12 +44,29 @@ def install(app, conf):
conf=dict(conf.get(OPT_GROUP_NAME)))


def get_limited_to_project(headers):
"""Return the tenant the request should be limited to."""
def get_limited_to(headers):
"""Return the user and project the request should be limited to.
:param headers: HTTP headers dictionary
:return: A tuple of (user, project), set to None if there's no limit on
one of these.
"""
global _ENFORCER
if not _ENFORCER:
_ENFORCER = policy.Enforcer()
if not _ENFORCER.enforce('context_is_admin',
{},
{'roles': headers.get('X-Roles', "").split(",")}):
return headers.get('X-Project-Id')
return headers.get('X-User-Id'), headers.get('X-Project-Id')
return None, None


def get_limited_to_project(headers):
"""Return the project the request should be limited to.
:param headers: HTTP headers dictionary
:return: A project, or None if there's no limit on it.
"""
return get_limited_to(headers)[1]
12 changes: 8 additions & 4 deletions ceilometer/api/controllers/v2.py
Expand Up @@ -1272,8 +1272,9 @@ def put(self, data):
now = timeutils.utcnow()

data.alarm_id = self._id
data.user_id = alarm_in.user_id
data.project_id = alarm_in.project_id
user, project = acl.get_limited_to(pecan.request.headers)
data.user_id = user or data.user_id or alarm_in.user_id
data.project_id = project or data.project_id or alarm_in.project_id
data.timestamp = now
if alarm_in.state != data.state:
data.state_timestamp = now
Expand Down Expand Up @@ -1391,8 +1392,11 @@ def post(self, data):
now = timeutils.utcnow()

data.alarm_id = str(uuid.uuid4())
data.user_id = pecan.request.headers.get('X-User-Id')
data.project_id = pecan.request.headers.get('X-Project-Id')
user, project = acl.get_limited_to(pecan.request.headers)
data.user_id = (user or data.user_id or
pecan.request.headers.get('X-User-Id'))
data.project_id = (project or data.project_id or
pecan.request.headers.get('X-Project-Id'))
data.timestamp = now
data.state_timestamp = now

Expand Down
187 changes: 187 additions & 0 deletions tests/api/v2/test_alarm_scenarios.py
Expand Up @@ -455,6 +455,141 @@ def test_post_alarm(self):
else:
self.fail("Alarm not found")

def test_post_alarm_as_admin(self):
"""Test the creation of an alarm as admin for another project."""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'threshold',
'user_id': 'auseridthatisnotmine',
'project_id': 'aprojectidthatisnotmine',
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'},
{'field': 'project_id', 'op': 'eq',
'value': 'aprojectidthatisnotmine'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
self.post_json('/alarms', params=json, status=201,
headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
self.assertEqual(1, len(alarms))
self.assertEqual(alarms[0].user_id, 'auseridthatisnotmine')
self.assertEqual(alarms[0].project_id, 'aprojectidthatisnotmine')
if alarms[0].name == 'added_alarm':
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarms[0], storage_key),
json[key])
else:
self.fail("Alarm not found")

def test_post_alarm_as_admin_no_user(self):
"""Test the creation of an alarm as admin for another project but
forgetting to set the values.
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'threshold',
'project_id': 'aprojectidthatisnotmine',
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'},
{'field': 'project_id', 'op': 'eq',
'value': 'aprojectidthatisnotmine'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
self.post_json('/alarms', params=json, status=201,
headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
self.assertEqual(1, len(alarms))
self.assertEqual(alarms[0].user_id, self.auth_headers['X-User-Id'])
self.assertEqual(alarms[0].project_id, 'aprojectidthatisnotmine')
if alarms[0].name == 'added_alarm':
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarms[0], storage_key),
json[key])
else:
self.fail("Alarm not found")

def test_post_alarm_as_admin_no_project(self):
"""Test the creation of an alarm as admin for another project but
forgetting to set the values.
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'threshold',
'user_id': 'auseridthatisnotmine',
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'},
{'field': 'project_id', 'op': 'eq',
'value': 'aprojectidthatisnotmine'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
self.post_json('/alarms', params=json, status=201,
headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
self.assertEqual(1, len(alarms))
self.assertEqual(alarms[0].user_id, 'auseridthatisnotmine')
self.assertEqual(alarms[0].project_id,
self.auth_headers['X-Project-Id'])
if alarms[0].name == 'added_alarm':
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarms[0], storage_key),
json[key])
else:
self.fail("Alarm not found")

def test_post_alarm_combination(self):
json = {
'enabled': False,
Expand Down Expand Up @@ -551,6 +686,58 @@ def test_put_alarm(self):
storage_key = key
self.assertEqual(getattr(alarm, storage_key), json[key])

def test_put_alarm_as_admin(self):
json = {
'user_id': 'myuserid',
'project_id': 'myprojectid',
'enabled': False,
'name': 'name_put',
'state': 'ok',
'type': 'threshold',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'},
{'field': 'project_id', 'op': 'eq',
'value': 'myprojectid'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'

data = self.get_json('/alarms',
headers=headers,
q=[{'field': 'name',
'value': 'name1',
}])
self.assertEqual(1, len(data))
alarm_id = data[0]['alarm_id']

self.put_json('/alarms/%s' % alarm_id,
params=json,
headers=headers)
alarm = list(self.conn.get_alarms(alarm_id=alarm_id, enabled=False))[0]
self.assertEqual(alarm.user_id, 'myuserid')
self.assertEqual(alarm.project_id, 'myprojectid')
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarm, storage_key), json[key])

def test_put_alarm_wrong_field(self):
# Note: wsme will ignore unknown fields so will just not appear in
# the Alarm.
Expand Down

0 comments on commit f7cb494

Please sign in to comment.