Skip to content
Permalink
Browse files

Support aggregate API in cells

Change-Id: I1d687de36787ec70bd14a0ad613b405bcea2938a
  • Loading branch information...
sorrison committed Apr 15, 2014
1 parent 6b1ab7d commit 8ca8828d191bc271460eb80567717fd15ef6167c
@@ -20,7 +20,7 @@
from webob import exc

from nova.api.openstack import extensions
from nova.compute import api as compute_api
from nova import compute
from nova import exception
from nova.i18n import _
from nova import utils
@@ -55,7 +55,7 @@ def wrapped(self, req, id, body, *args, **kwargs):
class AggregateController(object):
"""The Host Aggregates API controller for the OpenStack API."""
def __init__(self):
self.api = compute_api.AggregateAPI()
self.api = compute.AggregateAPI()

def index(self, req):
"""Returns a list a host aggregate's id, name, availability_zone."""
@@ -550,3 +550,57 @@ def rebuild_instance(self, ctxt, instance, image_href, admin_password,

def set_admin_password(self, ctxt, instance, new_pass):
self.msg_runner.set_admin_password(ctxt, instance, new_pass)

def _response_to_aggregate(self, response):
aggregate = response.value_or_raise()
cells_utils.add_cell_to_aggregate(aggregate, response.cell_name)
return aggregate

def create_aggregate(self, ctxt, cell_name,
aggregate_name, availability_zone):
response = self.msg_runner.create_aggregate(ctxt, cell_name,
aggregate_name,
availability_zone)
return self._response_to_aggregate(response)

def get_aggregate(self, ctxt, cell_name, aggregate_id):
response = self.msg_runner.get_aggregate(ctxt, cell_name, aggregate_id)
return self._response_to_aggregate(response)

def get_aggregate_list(self, ctxt):
responses = self.msg_runner.get_aggregate_list(ctxt)
result = []
for response in responses:
aggregates = response.value_or_raise()
for aggregate in aggregates:
cells_utils.add_cell_to_aggregate(
aggregate, response.cell_name)
result.append(aggregate)
return result

def update_aggregate(self, ctxt, cell_name, aggregate_id, values):
response = self.msg_runner.update_aggregate(
ctxt, cell_name, aggregate_id, values)
return self._response_to_aggregate(response)

def update_aggregate_metadata(self, ctxt, cell_name,
aggregate_id, metadata):
response = self.msg_runner.update_aggregate_metadata(
ctxt, cell_name, aggregate_id, metadata)
return self._response_to_aggregate(response)

def delete_aggregate(self, ctxt, cell_name, aggregate_id):
response = self.msg_runner.delete_aggregate(
ctxt, cell_name, aggregate_id)
response.value_or_raise()

def add_host_to_aggregate(self, ctxt, cell_name, aggregate_id, host_name):
response = self.msg_runner.add_host_to_aggregate(
ctxt, cell_name, aggregate_id, host_name)
return self._response_to_aggregate(response)

def remove_host_from_aggregate(self, ctxt, cell_name,
aggregate_id, host_name):
response = self.msg_runner.remove_host_from_aggregate(
ctxt, cell_name, aggregate_id, host_name)
return self._response_to_aggregate(response)
@@ -613,7 +613,8 @@ def __init__(self, msg_runner):
super(_BaseMessageMethods, self).__init__()
self.msg_runner = msg_runner
self.state_manager = msg_runner.state_manager
self.compute_api = compute.API()
self.compute_api = compute.api.API()
self.aggregate_api = compute.api.AggregateAPI()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
self.consoleauth_rpcapi = consoleauth_rpcapi.ConsoleAuthAPI()
self.host_api = compute.HostAPI()
@@ -956,6 +957,39 @@ def set_admin_password(self, message, instance, new_pass):
self._call_compute_api_with_obj(message.ctxt, instance,
'set_admin_password', new_pass)

def create_aggregate(self, message, aggregate_name, availability_zone):
response = self.aggregate_api.create_aggregate(
message.ctxt, aggregate_name, availability_zone)
return jsonutils.to_primitive(response)

def get_aggregate(self, message, aggregate_id):
response = self.aggregate_api.get_aggregate(
message.ctxt, aggregate_id)
return jsonutils.to_primitive(response)

def update_aggregate(self, message, aggregate_id, values):
response = self.aggregate_api.update_aggregate(
message.ctxt, aggregate_id, values)
return jsonutils.to_primitive(response)

def update_aggregate_metadata(self, message, aggregate_id, metadata):
response = self.aggregate_api.update_aggregate_metadata(
message.ctxt, aggregate_id, metadata)
return jsonutils.to_primitive(response)

def delete_aggregate(self, message, aggregate_id):
self.aggregate_api.delete_aggregate(message.ctxt, aggregate_id)

def add_host_to_aggregate(self, message, aggregate_id, host_name):
response = self.aggregate_api.add_host_to_aggregate(
message.ctxt, aggregate_id, host_name)
return jsonutils.to_primitive(response)

def remove_host_from_aggregate(self, message, aggregate_id, host_name):
response = self.aggregate_api.remove_host_from_aggregate(
message.ctxt, aggregate_id, host_name)
return jsonutils.to_primitive(response)


class _BroadcastMessageMethods(_BaseMessageMethods):
"""These are the methods that can be called as a part of a broadcast
@@ -1235,6 +1269,10 @@ def get_migrations(self, message, filters):
context = message.ctxt
return self.compute_api.get_migrations(context, filters)

def get_aggregate_list(self, message):
response = self.aggregate_api.get_aggregate_list(message.ctxt)
return jsonutils.to_primitive(response)


_CELL_MESSAGE_TYPE_TO_MESSAGE_CLS = {'targeted': _TargetedMessage,
'broadcast': _BroadcastMessage,
@@ -1821,6 +1859,68 @@ def set_admin_password(self, ctxt, instance, new_pass):
self._instance_action(ctxt, instance, 'set_admin_password',
extra_kwargs={'new_pass': new_pass})

def create_aggregate(self, ctxt, cell_name, aggregate_name,
availability_zone):
method_kwargs = dict(aggregate_name=aggregate_name,
availability_zone=availability_zone)
message = _TargetedMessage(self, ctxt, 'create_aggregate',
method_kwargs, 'down',
cell_name, need_response=True)
return message.process()

def get_aggregate(self, ctxt, cell_name, aggregate_id):
method_kwargs = dict(aggregate_id=aggregate_id)
message = _TargetedMessage(self, ctxt, 'get_aggregate',
method_kwargs, 'down',
cell_name, need_response=True)
return message.process()

def get_aggregate_list(self, ctxt):
message = _BroadcastMessage(self, ctxt, 'get_aggregate_list',
{}, 'down', need_response=True)
return message.process()

def update_aggregate(self, ctxt, cell_name, aggregate_id, values):
method_kwargs = dict(aggregate_id=aggregate_id, values=values)
message = _TargetedMessage(self, ctxt, 'update_aggregate',
method_kwargs, 'down',
cell_name, need_response=True)
return message.process()

def update_aggregate_metadata(self, ctxt, cell_name,
aggregate_id, metadata):
method_kwargs = dict(aggregate_id=aggregate_id, metadata=metadata)
message = _TargetedMessage(self, ctxt, 'update_aggregate_metadata',
method_kwargs, 'down',
cell_name, need_response=True)
return message.process()

def delete_aggregate(self, ctxt, cell_name, aggregate_id):
method_kwargs = dict(aggregate_id=aggregate_id)
# Need response in case the operation fails, so an exception will
# return
message = _TargetedMessage(self, ctxt, 'delete_aggregate',
method_kwargs, 'down',
cell_name, need_response=True)
return message.process()

def add_host_to_aggregate(self, ctxt, cell_name, aggregate_id, host_name):
method_kwargs = dict(aggregate_id=aggregate_id,
host_name=host_name)
message = _TargetedMessage(self, ctxt, 'add_host_to_aggregate',
method_kwargs, 'down',
cell_name, need_response=True)
return message.process()

def remove_host_from_aggregate(self, ctxt, cell_name,
aggregate_id, host_name):
method_kwargs = dict(aggregate_id=aggregate_id,
host_name=host_name)
message = _TargetedMessage(self, ctxt, 'remove_host_from_aggregate',
method_kwargs, 'down',
cell_name, need_response=True)
return message.process()

@staticmethod
def get_message_types():
return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys()
@@ -621,3 +621,55 @@ def set_admin_password(self, ctxt, instance, new_pass):
cctxt = self.client.prepare(version='1.29')
cctxt.cast(ctxt, 'set_admin_password', instance=instance,
new_pass=new_pass)

def create_aggregate(self, ctxt, cell_name,
aggregate_name, availability_zone):
cctxt = self.client.prepare(version='1.24.1')
return cctxt.call(ctxt, 'create_aggregate',
cell_name=cell_name,
aggregate_name=aggregate_name,
availability_zone=availability_zone)

def get_aggregate(self, ctxt, cell_name, aggregate_id):
cctxt = self.client.prepare(version='1.24.1')
return cctxt.call(ctxt, 'get_aggregate',
cell_name=cell_name,
aggregate_id=aggregate_id)

def get_aggregate_list(self, ctxt):
cctxt = self.client.prepare(version='1.24.1')
return cctxt.call(ctxt, 'get_aggregate_list')

def update_aggregate(self, ctxt, cell_name, aggregate_id, values):
cctxt = self.client.prepare(version='1.24.1')
return cctxt.call(ctxt, 'update_aggregate', cell_name=cell_name,
aggregate_id=aggregate_id,
values=values)

def update_aggregate_metadata(self, ctxt, cell_name,
aggregate_id, metadata):
cctxt = self.client.prepare(version='1.24.1')
return cctxt.call(ctxt, 'update_aggregate_metadata',
cell_name=cell_name,
aggregate_id=aggregate_id,
metadata=metadata)

def delete_aggregate(self, ctxt, cell_name, aggregate_id):
cctxt = self.client.prepare(version='1.24.1')
return cctxt.call(ctxt, 'delete_aggregate',
cell_name=cell_name,
aggregate_id=aggregate_id)

def add_host_to_aggregate(self, ctxt, cell_name, aggregate_id, host_name):
cctxt = self.client.prepare(version='1.24.1')
return cctxt.call(ctxt, 'add_host_to_aggregate',
cell_name=cell_name,
aggregate_id=aggregate_id, host_name=host_name)

def remove_host_from_aggregate(self, ctxt, cell_name,
aggregate_id, host_name):
cctxt = self.client.prepare(version='1.24.1')
return cctxt.call(ctxt, 'remove_host_from_aggregate',
cell_name=cell_name,
aggregate_id=aggregate_id,
host_name=host_name)
@@ -97,6 +97,15 @@ def add_cell_to_service(service, cell_name):
add_cell_to_compute_node(compute_node[0], cell_name)


def add_cell_to_aggregate(aggregate, cell_name):
"""Fix aggregate attributes that should be unique.
Changes the aggregate's name and ID to include the cell_name,
making them unique in the context of the api cell
"""
aggregate['id'] = cell_with_item(cell_name, aggregate['id'])
aggregate['name'] = cell_with_item(cell_name, aggregate['name'])


def add_cell_to_task_log(task_log, cell_name):
"""Fix task_log attributes that should be unique. In particular,
the 'id' and 'host' fields should be prepended with cell name.
@@ -50,6 +50,18 @@ def HostAPI(*args, **kwargs):
return importutils.import_object(class_name, *args, **kwargs)


def AggregateAPI(*args, **kwargs):
"""
Returns the 'AggregateAPI' class from the same module as the configured
compute api
"""
importutils = nova.openstack.common.importutils
compute_api_class_name = _get_compute_api_class_name()
compute_api_class = importutils.import_class(compute_api_class_name)
class_name = compute_api_class.__module__ + ".AggregateAPI"
return importutils.import_object(class_name, *args, **kwargs)


def InstanceActionAPI(*args, **kwargs):
"""Returns the 'InstanceActionAPI' class from the same module as the
configured compute api.
@@ -658,3 +658,51 @@ def action_get_by_request_id(self, context, instance, request_id):
def action_events_get(self, context, instance, action_id):
return self.cells_rpcapi.action_events_get(context, instance,
action_id)


class AggregateAPI(compute_api.AggregateAPI):
"""Sub-set of the Compute Manager API for managing host aggregates."""
def __init__(self, **kwargs):
super(AggregateAPI, self).__init__(**kwargs)
self.cells_rpcapi = cells_rpcapi.CellsAPI()

def _call_rpc_api(self, context, method_name, cell_and_item,
*args, **kwargs):
cell, item = cells_utils.split_cell_and_item(cell_and_item)
try:
item = int(item)
except ValueError:
pass
method = getattr(self.cells_rpcapi, method_name)
return method(context, cell, item, *args, **kwargs)

def create_aggregate(self, context, aggregate_name, availability_zone):
return self._call_rpc_api(
context, 'create_aggregate', aggregate_name, availability_zone)

def get_aggregate(self, context, aggregate_id):
return self._call_rpc_api(
context, 'get_aggregate', aggregate_id)

def get_aggregate_list(self, context):
return self.cells_rpcapi.get_aggregate_list(context)

def update_aggregate(self, context, aggregate_id, aggregate_values):
return self._call_rpc_api(
context, 'update_aggregate', aggregate_id, aggregate_values)

def update_aggregate_metadata(self, context, aggregate_id, metadata):
return self._call_rpc_api(
context, 'update_aggregate_metadata', aggregate_id, metadata)

def delete_aggregate(self, context, aggregate_id):
return self._call_rpc_api(
context, 'delete_aggregate', aggregate_id)

def add_host_to_aggregate(self, context, aggregate_id, host_name):
return self._call_rpc_api(
context, 'add_host_to_aggregate', aggregate_id, host_name)

def remove_host_from_aggregate(self, context, aggregate_id, host_name):
return self._call_rpc_api(
context, 'remove_host_from_aggregate', aggregate_id, host_name)

0 comments on commit 8ca8828

Please sign in to comment.
You can’t perform that action at this time.