Skip to content

Commit

Permalink
Fix bulk create operations and make them atomic.
Browse files Browse the repository at this point in the history
Bug 1024844
Bug 1020639

The API layer is now able to issue bulk create requests to the plugin,
assuming that the plugin supports them. Otherwise, the API layer will
emulate atomic behavior.
This patch also implements OVS plugin support for bulk requests.

Change-Id: I515148d870d0dff8371862fe577c477538364929
  • Loading branch information
salv-orlando committed Aug 13, 2012
1 parent 827e470 commit ff83702
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 62 deletions.
2 changes: 2 additions & 0 deletions etc/quantum.conf
Expand Up @@ -39,6 +39,8 @@ api_paste_config = api-paste.ini
# Maximum amount of retries to generate a unique MAC address
# mac_generation_retries = 16

# Enable or disable bulk create/update/delete operations
# allow_bulk = True
# RPC configuration options. Defined in rpc __init__
# The messaging module to use, defaults to kombu.
# rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu
Expand Down
85 changes: 65 additions & 20 deletions quantum/api/v2/base.py
Expand Up @@ -117,15 +117,23 @@ def verbose(request):


class Controller(object):
def __init__(self, plugin, collection, resource, attr_info):
def __init__(self, plugin, collection, resource,
attr_info, allow_bulk=False):
self._plugin = plugin
self._collection = collection
self._resource = resource
self._attr_info = attr_info
self._allow_bulk = allow_bulk
self._native_bulk = self._is_native_bulk_supported()
self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')]
self._publisher_id = notifier_api.publisher_id('network')

def _is_native_bulk_supported(self):
native_bulk_attr_name = ("_%s__native_bulk_support"
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_bulk_attr_name, False)

def _is_visible(self, attr):
attr_val = self._attr_info.get(attr)
return attr_val and attr_val['is_visible']
Expand Down Expand Up @@ -209,17 +217,41 @@ def show(self, request, id):
# doesn't exist
raise webob.exc.HTTPNotFound()

def _emulate_bulk_create(self, obj_creator, request, body):
objs = []
try:
for item in body[self._collection]:
kwargs = {self._resource: item}
objs.append(self._view(obj_creator(request.context,
**kwargs)))
return objs
# Note(salvatore-orlando): broad catch as in theory a plugin
# could raise any kind of exception
except Exception as ex:
for obj in objs:
delete_action = "delete_%s" % self._resource
obj_deleter = getattr(self._plugin, delete_action)
try:
obj_deleter(request.context, obj['id'])
except Exception:
# broad catch as our only purpose is to log the exception
LOG.exception("Unable to undo add for %s %s",
self._resource, obj['id'])
# TODO(salvatore-orlando): The object being processed when the
# plugin raised might have been created or not in the db.
# We need a way for ensuring that if it has been created,
# it is then deleted
raise

def create(self, request, body=None):
"""Creates a new instance of the requested entity"""
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.create.start',
notifier_api.INFO,
body)
body = self._prepare_request_body(request.context, body, True,
allow_bulk=True)
body = self._prepare_request_body(request.context, body, True)
action = "create_%s" % self._resource

# Check authz
try:
if self._collection in body:
Expand Down Expand Up @@ -256,16 +288,30 @@ def create(self, request, body=None):
LOG.exception("Create operation not authorized")
raise webob.exc.HTTPForbidden()

obj_creator = getattr(self._plugin, action)
kwargs = {self._resource: body}
obj = obj_creator(request.context, **kwargs)
result = {self._resource: self._view(obj)}
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.create.end',
notifier_api.INFO,
result)
return result
def notify(create_result):
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.create.end',
notifier_api.INFO,
create_result)
return create_result

if self._collection in body and self._native_bulk:
# plugin does atomic bulk create operations
obj_creator = getattr(self._plugin, "%s_bulk" % action)
objs = obj_creator(request.context, body)
return notify({self._collection: [self._view(obj)
for obj in objs]})
else:
obj_creator = getattr(self._plugin, action)
if self._collection in body:
# Emulate atomic bulk behavior
objs = self._emulate_bulk_create(obj_creator, request, body)
return notify({self._collection: objs})
else:
kwargs = {self._resource: body}
obj = obj_creator(request.context, **kwargs)
return notify({self._resource: self._view(obj)})

def delete(self, request, id):
"""Deletes the specified entity"""
Expand Down Expand Up @@ -355,8 +401,7 @@ def _populate_tenant_id(self, context, res_dict, is_create):
" that tenant_id is specified")
raise webob.exc.HTTPBadRequest(msg)

def _prepare_request_body(self, context, body, is_create,
allow_bulk=False):
def _prepare_request_body(self, context, body, is_create):
""" verifies required attributes are in request body, and that
an attribute is only specified if it is allowed for the given
operation (create/update).
Expand All @@ -369,7 +414,7 @@ def _prepare_request_body(self, context, body, is_create,
raise webob.exc.HTTPBadRequest(_("Resource body required"))

body = body or {self._resource: {}}
if self._collection in body and allow_bulk:
if self._collection in body and self._allow_bulk:
bulk_body = [self._prepare_request_body(context,
{self._resource: b},
is_create)
Expand All @@ -382,7 +427,7 @@ def _prepare_request_body(self, context, body, is_create,

return {self._collection: bulk_body}

elif self._collection in body and not allow_bulk:
elif self._collection in body and not self._allow_bulk:
raise webob.exc.HTTPBadRequest("Bulk operation not supported")

res_dict = body.get(self._resource)
Expand Down Expand Up @@ -459,8 +504,8 @@ def _validate_network_tenant_ownership(self, request, resource_item):
})


def create_resource(collection, resource, plugin, params):
controller = Controller(plugin, collection, resource, params)
def create_resource(collection, resource, plugin, params, allow_bulk=False):
controller = Controller(plugin, collection, resource, params, allow_bulk)

# NOTE(jkoelker) To anyone wishing to add "proper" xml support
# this is where you do it
Expand Down
5 changes: 3 additions & 2 deletions quantum/api/v2/router.py
Expand Up @@ -69,7 +69,6 @@ def factory(cls, global_config, **local_config):
def __init__(self, **local_config):
mapper = routes_mapper.Mapper()
plugin = manager.QuantumManager.get_plugin()

ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

Expand All @@ -81,8 +80,10 @@ def __init__(self, **local_config):
'port': 'ports'}

def _map_resource(collection, resource, params):
allow_bulk = cfg.CONF.allow_bulk
controller = base.create_resource(collection, resource,
plugin, params)
plugin, params,
allow_bulk=allow_bulk)
mapper_kwargs = dict(controller=controller,
requirements=REQUIREMENTS,
**col_kwargs)
Expand Down
3 changes: 2 additions & 1 deletion quantum/common/config.py
Expand Up @@ -43,7 +43,8 @@
cfg.StrOpt('core_plugin',
default='quantum.plugins.sample.SamplePlugin.FakePlugin'),
cfg.StrOpt('base_mac', default="fa:16:3e:00:00:00"),
cfg.IntOpt('mac_generation_retries', default=16)
cfg.IntOpt('mac_generation_retries', default=16),
cfg.BoolOpt('allow_bulk', default=True),
]

# Register the configuration options
Expand Down
41 changes: 37 additions & 4 deletions quantum/db/db_base_plugin_v2.py
Expand Up @@ -41,6 +41,11 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
certain events.
"""

# This attribute specifies whether the plugin supports or not
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
__native_bulk_support = True

def __init__(self):
# NOTE(jkoelker) This is an incomlete implementation. Subclasses
# must override __init__ and setup the database
Expand Down Expand Up @@ -673,12 +678,34 @@ def _make_port_dict(self, port, fields=None):
"device_id": port["device_id"]}
return self._fields(res, fields)

def _create_bulk(self, resource, context, request_items):
objects = []
collection = "%ss" % resource
items = request_items[collection]
context.session.begin(subtransactions=True)
try:
for item in items:
obj_creator = getattr(self, 'create_%s' % resource)
objects.append(obj_creator(context, item))
context.session.commit()
except Exception:
LOG.exception("An exception occured while creating "
"the port:%s", item)
context.session.rollback()
raise
return objects

def create_network_bulk(self, context, networks):
return self._create_bulk('network', context, networks)

def create_network(self, context, network):
""" handle creation of a single network """
# single request processing
n = network['network']
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, n)
with context.session.begin():
with context.session.begin(subtransactions=True):
network = models_v2.Network(tenant_id=tenant_id,
id=n.get('id') or utils.str_uuid(),
name=n['name'],
Expand Down Expand Up @@ -721,14 +748,17 @@ def get_networks(self, context, filters=None, fields=None, verbose=None):
filters=filters, fields=fields,
verbose=verbose)

def create_subnet_bulk(self, context, subnets):
return self._create_bulk('subnet', context, subnets)

def create_subnet(self, context, subnet):
s = subnet['subnet']
net = netaddr.IPNetwork(s['cidr'])
if s['gateway_ip'] == attributes.ATTR_NOT_SPECIFIED:
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))

tenant_id = self._get_tenant_id_for_create(context, s)
with context.session.begin():
with context.session.begin(subtransactions=True):
network = self._get_network(context, s["network_id"])
self._validate_subnet_cidr(network, s['cidr'])
subnet = models_v2.Subnet(tenant_id=tenant_id,
Expand Down Expand Up @@ -780,13 +810,16 @@ def get_subnets(self, context, filters=None, fields=None, verbose=None):
filters=filters, fields=fields,
verbose=verbose)

def create_port_bulk(self, context, ports):
return self._create_bulk('port', context, ports)

def create_port(self, context, port):
p = port['port']
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, p)

with context.session.begin():
with context.session.begin(subtransactions=True):
network = self._get_network(context, p["network_id"])

# Ensure that a MAC address is defined and it is unique on the
Expand Down Expand Up @@ -817,7 +850,7 @@ def create_port(self, context, port):

# Update the allocated IP's
if ips:
with context.session.begin():
with context.session.begin(subtransactions=True):
for ip in ips:
LOG.debug("Allocated IP %s (%s/%s/%s)", ip['ip_address'],
port['network_id'], ip['subnet_id'], port.id)
Expand Down
1 change: 0 additions & 1 deletion quantum/plugins/linuxbridge/lb_quantum_plugin.py
Expand Up @@ -196,7 +196,6 @@ def create_network(self, context, network):
super(LinuxBridgePluginV2, self).delete_network(context,
net['id'])
raise

return net

def update_network(self, context, id, network):
Expand Down
19 changes: 8 additions & 11 deletions quantum/plugins/openvswitch/ovs_db_v2.py
Expand Up @@ -40,8 +40,8 @@ def get_vlans():
return [(binding.vlan_id, binding.network_id) for binding in bindings]


def get_vlan(net_id):
session = db.get_session()
def get_vlan(net_id, session=None):
session = session or db.get_session()
try:
binding = (session.query(ovs_models_v2.VlanBinding).
filter_by(network_id=net_id).
Expand All @@ -51,11 +51,10 @@ def get_vlan(net_id):
return binding.vlan_id


def add_vlan_binding(vlan_id, net_id):
session = db.get_session()
binding = ovs_models_v2.VlanBinding(vlan_id, net_id)
session.add(binding)
session.flush()
def add_vlan_binding(vlan_id, net_id, session):
with session.begin(subtransactions=True):
binding = ovs_models_v2.VlanBinding(vlan_id, net_id)
session.add(binding)
return binding


Expand Down Expand Up @@ -114,10 +113,9 @@ def get_vlan_id(vlan_id):
return None


def reserve_vlan_id():
def reserve_vlan_id(session):
"""Reserve an unused vlan_id"""

session = db.get_session()
with session.begin(subtransactions=True):
record = (session.query(ovs_models_v2.VlanID).
filter_by(vlan_used=False).
Expand All @@ -129,14 +127,13 @@ def reserve_vlan_id():
return record.vlan_id


def reserve_specific_vlan_id(vlan_id):
def reserve_specific_vlan_id(vlan_id, session):
"""Reserve a specific vlan_id"""

if vlan_id < 1 or vlan_id > 4094:
msg = _("Specified VLAN %s outside legal range (1-4094)") % vlan_id
raise q_exc.InvalidInput(error_message=msg)

session = db.get_session()
with session.begin(subtransactions=True):
try:
record = (session.query(ovs_models_v2.VlanID).
Expand Down
13 changes: 9 additions & 4 deletions quantum/plugins/openvswitch/ovs_quantum_plugin.py
Expand Up @@ -177,6 +177,10 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
be updated to take advantage of it.
"""

# This attribute specifies whether the plugin supports or not
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
__native_bulk_support = True
supported_extension_aliases = ["provider"]

def __init__(self, configfile=None):
Expand Down Expand Up @@ -227,23 +231,24 @@ def _enforce_provider_set_auth(self, context, network):
def _extend_network_dict(self, context, network):
if self._check_provider_view_auth(context, network):
if not self.enable_tunneling:
network['provider:vlan_id'] = ovs_db_v2.get_vlan(network['id'])
network['provider:vlan_id'] = ovs_db_v2.get_vlan(
network['id'], context.session)

def create_network(self, context, network):
net = super(OVSQuantumPluginV2, self).create_network(context, network)
try:
vlan_id = network['network'].get('provider:vlan_id')
if vlan_id not in (None, attributes.ATTR_NOT_SPECIFIED):
self._enforce_provider_set_auth(context, net)
ovs_db_v2.reserve_specific_vlan_id(vlan_id)
ovs_db_v2.reserve_specific_vlan_id(vlan_id, context.session)
else:
vlan_id = ovs_db_v2.reserve_vlan_id()
vlan_id = ovs_db_v2.reserve_vlan_id(context.session)
except Exception:
super(OVSQuantumPluginV2, self).delete_network(context, net['id'])
raise

LOG.debug("Created network: %s" % net['id'])
ovs_db_v2.add_vlan_binding(vlan_id, str(net['id']))
ovs_db_v2.add_vlan_binding(vlan_id, str(net['id']), context.session)
self._extend_network_dict(context, net)
return net

Expand Down

0 comments on commit ff83702

Please sign in to comment.