Skip to content

Commit

Permalink
Adds tests, fixes Radware LBaaS driver as a result
Browse files Browse the repository at this point in the history
Adding more tests for Radware LBaaS driver.
Adding new exception module for the Radware lbaas driver.
The base radware lbaas exception, called RadwareLBaasException,
Several specific exceptions for different failures.
Driver was changed for using new exceptions as well.
Changing the way OperationsHandler obtains context.
Always waiting 1 second before handling the operation
next time, to prevent busy-wait requests on vDirect.
Several code optimizations were done as well.

Change-Id: I15f7845fc2575eedb62c47d15ee6c1cea08e22f5
Closes-Bug: #1236741
  • Loading branch information
evgenyfedoruk committed Nov 20, 2013
1 parent a8c3c0c commit 3299eb3
Show file tree
Hide file tree
Showing 3 changed files with 394 additions and 173 deletions.
171 changes: 93 additions & 78 deletions neutron/services/loadbalancer/drivers/radware/driver.py
Expand Up @@ -27,15 +27,15 @@
import eventlet
from oslo.config import cfg

from neutron.common import exceptions as q_exc
from neutron.common import log as call_log
from neutron import context as qcontext
import neutron.db.loadbalancer.loadbalancer_db as lb_db
from neutron import context
from neutron.db.loadbalancer import loadbalancer_db as lb_db
from neutron.extensions import loadbalancer
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers import abstract_driver
from neutron.services.loadbalancer.drivers.radware import exceptions as r_exc

eventlet.monkey_patch(thread=True)

Expand Down Expand Up @@ -172,9 +172,9 @@ def __init__(self, plugin):
user=rad.vdirect_user,
password=rad.vdirect_password)
self.queue = Queue.Queue()
self.completion_handler = OperationCompletionHander(self.queue,
self.rest_client,
plugin)
self.completion_handler = OperationCompletionHandler(self.queue,
self.rest_client,
plugin)
self.workflow_templates_exists = False
self.completion_handler.setDaemon(True)
self.completion_handler.start()
Expand Down Expand Up @@ -205,21 +205,27 @@ def delete_vip(self, context, vip):
First delete it from the device. If deletion ended OK
- remove data from DB as well.
If the deletion failed - mark elements with error status in DB
If the deletion failed - mark vip with error status in DB
"""

extended_vip = self.plugin.populate_vip_graph(context, vip)
params = _translate_vip_object_graph(extended_vip,
self.plugin, context)
ids = params.pop('__ids__')

try:
# removing the WF will cause deletion of the configuration from the
# device
self._remove_workflow(extended_vip, context)
except Exception:
self._remove_workflow(ids, context)
except r_exc.RESTRequestFailure:
pool_id = extended_vip['pool_id']
LOG.exception(_('Failed to remove workflow %s'), pool_id)
_update_vip_graph_status(
self.plugin, context, extended_vip, constants.ERROR
)
LOG.exception(_('Failed to remove workflow %s. '
'Going to set vip to ERROR status'),
pool_id)

self.plugin.update_status(context, lb_db.Vip, ids['vip'],
constants.ERROR)

def create_pool(self, context, pool):
# nothing to do
Expand Down Expand Up @@ -306,8 +312,8 @@ def _handle_pool_health_monitor(self, context, health_monitor,
debug_params = {"hm_id": health_monitor['id'], "pool_id": pool_id,
"delete": delete, "vip_id": vip_id}
LOG.debug(_('_handle_pool_health_monitor. health_monitor = %(hm_id)s '
'pool_id = %(pool_id)s delete = %(delete)s '
'vip_id = %(vip_id)s'),
'pool_id = %(pool_id)s delete = %(delete)s '
'vip_id = %(vip_id)s'),
debug_params)

if vip_id:
Expand Down Expand Up @@ -359,11 +365,6 @@ def _update_workflow(self, wf_name, action,

if action not in self.actions_to_skip:
ids = params.pop('__ids__', None)
if not ids:
raise q_exc.NeutronException(
_('params must contain __ids__ field!')
)

oper = OperationAttributes(response['uri'],
ids,
lbaas_entity,
Expand All @@ -372,13 +373,7 @@ def _update_workflow(self, wf_name, action,
LOG.debug(_('Pushing operation %s to the queue'), oper)
self.queue.put_nowait(oper)

def _remove_workflow(self, wf_params, context):
params = _translate_vip_object_graph(wf_params, self.plugin, context)
ids = params.pop('__ids__', None)
if not ids:
raise q_exc.NeutronException(
_('params must contain __ids__ field!')
)
def _remove_workflow(self, ids, context):

wf_name = ids['pool']
LOG.debug(_('Remove the workflow %s') % wf_name)
Expand Down Expand Up @@ -504,8 +499,7 @@ def _verify_workflow_templates(self):
break
for wf, found in workflows.items():
if not found:
msg = _('The workflow %s does not exist on vDirect.') % wf
raise q_exc.NeutronException(msg)
raise r_exc.WorkflowMissing(workflow=wf)
self.workflow_templates_exists = True


Expand All @@ -529,8 +523,8 @@ def __init__(self,
self.auth = base64.encodestring('%s:%s' % (user, password))
self.auth = self.auth.replace('\n', '')
else:
msg = _('User and password must be specified')
raise q_exc.NeutronException(msg)
raise r_exc.AuthenticationMissing()

debug_params = {'server': self.server,
'port': self.port,
'ssl': self.ssl}
Expand Down Expand Up @@ -613,17 +607,17 @@ def __repr__(self):
return "<%s: {%s}>" % (self.__class__.__name__, ', '.join(items))


class OperationCompletionHander(threading.Thread):
class OperationCompletionHandler(threading.Thread):

"""Update DB with operation status or delete the entity from DB."""

def __init__(self, queue, rest_client, plugin):
threading.Thread.__init__(self)
self.queue = queue
self.rest_client = rest_client
self.admin_ctx = qcontext.get_admin_context()
self.plugin = plugin
self.stoprequest = threading.Event()
self.opers_to_handle_before_rest = 0

def _get_db_status(self, operation, success, messages=None):
"""Get the db_status based on the status of the vdirect operation."""
Expand All @@ -641,13 +635,20 @@ def _get_db_status(self, operation, success, messages=None):

def join(self, timeout=None):
self.stoprequest.set()
super(OperationCompletionHander, self).join(timeout)
super(OperationCompletionHandler, self).join(timeout)

def run(self):
oper = None
while not self.stoprequest.isSet():
try:
oper = self.queue.get(timeout=1)

# Get the current queue size (N) and set the counter with it.
# Handle N operations with no intermission.
# Once N operations handles, get the size again and repeat.
if self.opers_to_handle_before_rest <= 0:
self.opers_to_handle_before_rest = self.queue.qsize() + 1

LOG.debug('Operation consumed from the queue: ' +
str(oper))
# check the status - if oper is done: update the db ,
Expand All @@ -672,96 +673,110 @@ def run(self):
db_status = self._get_db_status(oper, success)
if db_status:
_update_vip_graph_status(
self.plugin, self.admin_ctx,
oper, db_status)
self.plugin, oper, db_status)
else:
_remove_object_from_db(
self.plugin, self.admin_ctx, oper)
self.plugin, oper)
else:
# not completed - push to the queue again
LOG.debug(_('Operation %s is not completed yet..') % oper)
# queue is empty - lets take a short rest
if self.queue.empty():
time.sleep(1)
# Not completed - push to the queue again
self.queue.put_nowait(oper)
# send a signal to the queue that the job is done

self.queue.task_done()
self.opers_to_handle_before_rest -= 1

# Take one second rest before start handling
# new operations or operations handled before
if self.opers_to_handle_before_rest <= 0:
time.sleep(1)

except Queue.Empty:
continue
except Exception:
m = _("Exception was thrown inside OperationCompletionHander")
m = _("Exception was thrown inside OperationCompletionHandler")
LOG.exception(m)


def _rest_wrapper(response, success_codes=[202]):
"""Wrap a REST call and make sure a valid status is returned."""
if response[RESP_STATUS] not in success_codes:
raise q_exc.NeutronException(str(response[RESP_STATUS]) + ':' +
response[RESP_REASON] +
'. Error description: ' +
response[RESP_STR])
raise r_exc.RESTRequestFailure(
status=response[RESP_STATUS],
reason=response[RESP_REASON],
description=response[RESP_STR],
success_codes=success_codes
)
else:
return response[RESP_DATA]


def _update_vip_graph_status(plugin, context, oper, status):
def _update_vip_graph_status(plugin, oper, status):
"""Update the status
Of all the Vip object graph
or a specific entity in the graph.
"""

ctx = context.get_admin_context(load_admin_roles=False)

LOG.debug(_('_update: %s '), oper)
if oper.lbaas_entity == lb_db.PoolMonitorAssociation:
plugin.update_pool_health_monitor(context,
plugin.update_pool_health_monitor(ctx,
oper.entity_id,
oper.object_graph['pool'],
status)
elif oper.entity_id:
plugin.update_status(context,
plugin.update_status(ctx,
oper.lbaas_entity,
oper.entity_id,
status)
else:
# update the whole vip graph status
plugin.update_status(context,
lb_db.Vip,
oper.object_graph['vip'],
_update_vip_graph_status_cascade(plugin,
oper.object_graph,
ctx, status)


def _update_vip_graph_status_cascade(plugin, ids, ctx, status):
plugin.update_status(ctx,
lb_db.Vip,
ids['vip'],
status)
plugin.update_status(ctx,
lb_db.Pool,
ids['pool'],
status)
for member_id in ids['members']:
plugin.update_status(ctx,
lb_db.Member,
member_id,
status)
plugin.update_status(context,
lb_db.Pool,
oper.object_graph['pool'],
status)
for member_id in oper.object_graph['members']:
plugin.update_status(context,
lb_db.Member,
member_id,
status)
for hm_id in oper.object_graph['health_monitors']:
plugin.update_pool_health_monitor(context,
hm_id,
oper.object_graph['pool'],
status)


def _remove_object_from_db(plugin, context, oper):
for hm_id in ids['health_monitors']:
plugin.update_pool_health_monitor(ctx,
hm_id,
ids['pool'],
status)


def _remove_object_from_db(plugin, oper):
"""Remove a specific entity from db."""
LOG.debug(_('_remove_object_from_db %s'), str(oper))

ctx = context.get_admin_context(load_admin_roles=False)

if oper.lbaas_entity == lb_db.PoolMonitorAssociation:
plugin._delete_db_pool_health_monitor(context,
plugin._delete_db_pool_health_monitor(ctx,
oper.entity_id,
oper.object_graph['pool'])
elif oper.lbaas_entity == lb_db.Member:
plugin._delete_db_member(context, oper.entity_id)
plugin._delete_db_member(ctx, oper.entity_id)
elif oper.lbaas_entity == lb_db.Vip:
plugin._delete_db_vip(context, oper.entity_id)
plugin._delete_db_vip(ctx, oper.entity_id)
elif oper.lbaas_entity == lb_db.Pool:
plugin._delete_db_pool(context, oper.entity_id)
plugin._delete_db_pool(ctx, oper.entity_id)
else:
raise q_exc.NeutronException(
_('Tried to remove unsupported lbaas entity %s!'),
str(oper.lbaas_entity)
raise r_exc.UnsupportedEntityOperation(
operation='Remove from DB', entity=oper.lbaas_entity
)

TRANSLATION_DEFAULTS = {'session_persistence_type': 'SOURCE_IP',
Expand Down
44 changes: 44 additions & 0 deletions neutron/services/loadbalancer/drivers/radware/exceptions.py
@@ -0,0 +1,44 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 Radware LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Evgeny Fedoruk, Radware


from neutron.common import exceptions


class RadwareLBaasException(exceptions.NeutronException):
message = _('An unknown exception occurred in Radware LBaaS provider.')


class AuthenticationMissing(RadwareLBaasException):
message = _('vDirect user/password missing. '
'Specify in configuration file, under [radware] section')


class WorkflowMissing(RadwareLBaasException):
message = _('Workflow %(workflow)s is missing on vDirect server. '
'Upload missing workflow')


class RESTRequestFailure(RadwareLBaasException):
message = _('REST request failed with status %(status)s. '
'Reason: %(reason)s, Description: %(description)s. '
'Success status codes are %(success_codes)s')


class UnsupportedEntityOperation(RadwareLBaasException):
message = _('%(operation)s operation is not supported for %(entity)s.')

0 comments on commit 3299eb3

Please sign in to comment.