Skip to content

Commit

Permalink
Improve l3-agent performance and prevent races in it.
Browse files Browse the repository at this point in the history
Fixes bug 1194026
Fixes bug 1200749

Introduce a looping call for performing synchronization with
neutron server.
The sync will be performed only if router changes are notified
via rpc. Only affected routers will be synchronized.

Changes will be implemented by the l3 agent spawning a
distinct greenthread for each router - iptables will
be executed only once using iptables_manager.defer_apply_on.

This patch will prevent the occurence of the following issues:
- Out-of-order rpc message processing
- Long processing time for router changes due to serial execution
- Occasional and expected RPC blocks for long periods
- Unnecessary processing of multiple requests

Change-Id: I0978d1c38ac5c38c4548e5b1877857bb5cac3b81
  • Loading branch information
Nachi Ueno committed Jul 16, 2013
1 parent bd18990 commit 57e1fa3
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 118 deletions.
122 changes: 82 additions & 40 deletions neutron/agent/l3_agent.py
Expand Up @@ -18,7 +18,6 @@
#

import eventlet
from eventlet import semaphore
import netaddr
from oslo.config import cfg

Expand All @@ -36,6 +35,7 @@
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
Expand All @@ -49,6 +49,7 @@
NS_PREFIX = 'qrouter-'
INTERNAL_DEV_PREFIX = 'qr-'
EXTERNAL_DEV_PREFIX = 'qg-'
RPC_LOOP_INTERVAL = 1


class L3PluginApi(proxy.RpcProxy):
Expand All @@ -66,9 +67,8 @@ def __init__(self, topic, host):
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.host = host

def get_routers(self, context, fullsync=True, router_id=None):
def get_routers(self, context, fullsync=True, router_ids=None):
"""Make a remote process call to retrieve the sync data for routers."""
router_ids = [router_id] if router_id else None
return self.call(context,
self.make_msg('sync_routers', host=self.host,
fullsync=fullsync,
Expand Down Expand Up @@ -140,6 +140,17 @@ def perform_snat_action(self, snat_callback, *args):


class L3NATAgent(manager.Manager):
"""Manager for L3NatAgent
API version history:
1.0 initial Version
1.1 changed the type of the routers parameter
to the routers_updated method.
It was previously a list of routers in dict format.
It is now a list of router IDs only.
Per rpc versioning rules, it is backwards compatible.
"""
RPC_API_VERSION = '1.1'

OPTS = [
cfg.StrOpt('external_network_bridge', default='br-ex',
Expand Down Expand Up @@ -196,9 +207,15 @@ def __init__(self, host, conf=None):
self.context = context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
self.fullsync = True
self.sync_sem = semaphore.Semaphore(1)
self.updated_routers = set()
self.removed_routers = set()
self.sync_progress = False
if self.conf.use_namespaces:
self._destroy_router_namespaces(self.conf.router_id)

self.rpc_loop = loopingcall.FixedIntervalLoopingCall(
self._rpc_loop)
self.rpc_loop.start(interval=RPC_LOOP_INTERVAL)
super(L3NATAgent, self).__init__(host=self.conf.host)

def _destroy_router_namespaces(self, only_router_id=None):
Expand Down Expand Up @@ -323,6 +340,7 @@ def _set_subnet_info(self, port):
port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)

def process_router(self, ri):
ri.iptables_manager.defer_apply_on()
ex_gw_port = self._get_ex_gw_port(ri)
internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
existing_port_ids = set([p['id'] for p in ri.internal_ports])
Expand Down Expand Up @@ -371,6 +389,7 @@ def process_router(self, ri):
ri.ex_gw_port = ex_gw_port
ri.enable_snat = ri.router.get('enable_snat')
self.routes_updated(ri)
ri.iptables_manager.defer_apply_off()

def _handle_router_snat_rules(self, ri, ex_gw_port, internal_cidrs,
interface_name, action):
Expand Down Expand Up @@ -586,35 +605,28 @@ def floating_forward_rules(self, floating_ip, fixed_ip):

def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
with self.sync_sem:
if router_id in self.router_info:
try:
self._router_removed(router_id)
except Exception:
msg = _("Failed dealing with router "
"'%s' deletion RPC message")
LOG.debug(msg, router_id)
self.fullsync = True
LOG.debug(_('Got router deleted notification for %s'), router_id)
self.removed_routers.add(router_id)

def routers_updated(self, context, routers):
"""Deal with routers modification and creation RPC message."""
if not routers:
return
with self.sync_sem:
try:
self._process_routers(routers)
except Exception:
msg = _("Failed dealing with routers update RPC message")
LOG.debug(msg)
self.fullsync = True
LOG.debug(_('Got routers updated notification :%s'), routers)
if routers:
# This is needed for backward compatiblity
if isinstance(routers[0], dict):
routers = [router['id'] for router in routers]
self.updated_routers.update(routers)

def router_removed_from_agent(self, context, payload):
self.router_deleted(context, payload['router_id'])
LOG.debug(_('Got router removed from agent :%r'), payload)
self.removed_routers.add(payload['router_id'])

def router_added_to_agent(self, context, payload):
LOG.debug(_('Got router added to agent :%r'), payload)
self.routers_updated(context, payload)

def _process_routers(self, routers, all_routers=False):
pool = eventlet.GreenPool()
if (self.conf.external_network_bridge and
not ip_lib.device_exists(self.conf.external_network_bridge)):
LOG.error(_("The external network bridge '%s' does not exist"),
Expand Down Expand Up @@ -652,28 +664,58 @@ def _process_routers(self, routers, all_routers=False):
self._router_added(r['id'], r)
ri = self.router_info[r['id']]
ri.router = r
self.process_router(ri)
pool.spawn_n(self.process_router, ri)
# identify and remove routers that no longer exist
for router_id in prev_router_ids - cur_router_ids:
self._router_removed(router_id)
pool.spawn_n(self._router_removed, router_id)
pool.waitall()

@lockutils.synchronized('l3-agent', 'neutron-')
def _rpc_loop(self):
# _rpc_loop and _sync_routers_task will not be
# executed in the same time because of lock.
# so we can clear the value of updated_routers
# and removed_routers
try:
if self.updated_routers:
router_ids = list(self.updated_routers)
self.updated_routers.clear()
routers = self.plugin_rpc.get_routers(
self.context, router_ids)
self._process_routers(routers)
self._process_router_delete()
except Exception:
LOG.exception(_("Failed synchronizing routers"))
self.fullsync = True

def _process_router_delete(self):
current_removed_routers = list(self.removed_routers)
for router_id in current_removed_routers:
self._router_removed(context, router_id)
self.removed_routers.remove(router_id)

def _router_ids(self):
if not self.conf.use_namespaces:
return [self.conf.router_id]

@periodic_task.periodic_task
@lockutils.synchronized('l3-agent', 'neutron-')
def _sync_routers_task(self, context):
# we need to sync with router deletion RPC message
with self.sync_sem:
if self.fullsync:
try:
if not self.conf.use_namespaces:
router_id = self.conf.router_id
else:
router_id = None
routers = self.plugin_rpc.get_routers(
context, router_id)
self._process_routers(routers, all_routers=True)
self.fullsync = False
except Exception:
LOG.exception(_("Failed synchronizing routers"))
self.fullsync = True
if not self.fullsync:
return
try:
router_ids = self._router_ids()
self.updated_routers.clear()
self.removed_routers.clear()
routers = self.plugin_rpc.get_routers(
context, router_ids)

LOG.debug(_('Processing :%r'), routers)
self._process_routers(routers, all_routers=True)
self.fullsync = False
except Exception:
LOG.exception(_("Failed synchronizing routers"))
self.fullsync = True

def after_start(self):
LOG.info(_("L3 agent started"))
Expand Down
29 changes: 15 additions & 14 deletions neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py
Expand Up @@ -42,14 +42,14 @@ def _notification_host(self, context, method, payload, host):
payload=payload),
topic='%s.%s' % (topics.L3_AGENT, host))

def _agent_notification(self, context, method, routers,
def _agent_notification(self, context, method, router_ids,
operation, data):
"""Notify changed routers to hosting l3 agents."""
adminContext = context.is_admin and context or context.elevated()
plugin = manager.NeutronManager.get_plugin()
for router in routers:
for router_id in router_ids:
l3_agents = plugin.get_l3_agents_hosting_routers(
adminContext, [router['id']],
adminContext, [router_id],
admin_state_up=True,
active=True)
for l3_agent in l3_agents:
Expand All @@ -60,23 +60,24 @@ def _agent_notification(self, context, method, routers,
'method': method})
self.cast(
context, self.make_msg(method,
routers=[router]),
topic='%s.%s' % (l3_agent.topic, l3_agent.host))
routers=[router_id]),
topic='%s.%s' % (l3_agent.topic, l3_agent.host),
version='1.1')

def _notification(self, context, method, routers, operation, data):
def _notification(self, context, method, router_ids, operation, data):
"""Notify all the agents that are hosting the routers."""
plugin = manager.NeutronManager.get_plugin()
if utils.is_extension_supported(
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
adminContext = (context.is_admin and
context or context.elevated())
plugin.schedule_routers(adminContext, routers)
plugin.schedule_routers(adminContext, router_ids)
self._agent_notification(
context, method, routers, operation, data)
context, method, router_ids, operation, data)
else:
self.fanout_cast(
context, self.make_msg(method,
routers=routers),
routers=router_ids),
topic=topics.L3_AGENT)

def _notification_fanout(self, context, method, router_id):
Expand All @@ -99,17 +100,17 @@ def agent_updated(self, context, admin_state_up, host):
def router_deleted(self, context, router_id):
self._notification_fanout(context, 'router_deleted', router_id)

def routers_updated(self, context, routers, operation=None, data=None):
if routers:
self._notification(context, 'routers_updated', routers,
def routers_updated(self, context, router_ids, operation=None, data=None):
if router_ids:
self._notification(context, 'routers_updated', router_ids,
operation, data)

def router_removed_from_agent(self, context, router_id, host):
self._notification_host(context, 'router_removed_from_agent',
{'router_id': router_id}, host)

def router_added_to_agent(self, context, routers, host):
def router_added_to_agent(self, context, router_ids, host):
self._notification_host(context, 'router_added_to_agent',
routers, host)
router_ids, host)

L3AgentNotify = L3AgentNotifyAPI()
3 changes: 1 addition & 2 deletions neutron/db/agentschedulers_db.py
Expand Up @@ -129,9 +129,8 @@ def add_router_to_l3_agent(self, context, id, router_id):
router_id=router_id, agent_id=id)

if self.l3_agent_notifier:
routers = self.get_sync_data(context, [router_id])
self.l3_agent_notifier.router_added_to_agent(
context, routers, agent_db.host)
context, [router_id], agent_db.host)

def remove_router_from_l3_agent(self, context, id, router_id):
"""Remove the router from l3 agent.
Expand Down

0 comments on commit 57e1fa3

Please sign in to comment.