Skip to content

Commit

Permalink
Fix l2 pop doesn't propagate ip address updates
Browse files Browse the repository at this point in the history
Propagates ip address changes when an ip address is :
added, removed, or changed.
Add a new rpc call for the updates of forwarding informations.

Fixes: Bug #1234137
Change-Id: Ib5b971bd02f20a0ea73f88ce9685e944226bb5a2
(cherry picked from commit e6a368a)
  • Loading branch information
Sylvain Afchain authored and openstack-gerrit committed Oct 8, 2013
1 parent 74c60ab commit 3375f66
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 45 deletions.
9 changes: 9 additions & 0 deletions neutron/agent/l2population_rpc.py
Expand Up @@ -37,10 +37,19 @@ def remove_fdb_entries(self, context, fdb_entries, host=None):
if not host or host == cfg.CONF.host:
self.fdb_remove(context, fdb_entries)

@log.log
def update_fdb_entries(self, context, fdb_entries, host=None):
if not host or host == cfg.CONF.host:
self.fdb_update(context, fdb_entries)

@abc.abstractmethod
def fdb_add(self, context, fdb_entries):
pass

@abc.abstractmethod
def fdb_remove(self, context, fdb_entries):
pass

@abc.abstractmethod
def fdb_update(self, context, fdb_entries):
pass
34 changes: 34 additions & 0 deletions neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
Expand Up @@ -717,6 +717,40 @@ def fdb_remove(self, context, fdb_entries):
ports,
interface)

def _fdb_chg_ip(self, context, fdb_entries):
LOG.debug(_("update chg_ip received"))
for network_id, agent_ports in fdb_entries.items():
segment = self.agent.br_mgr.network_map.get(network_id)
if not segment:
return

if segment.network_type != lconst.TYPE_VXLAN:
return

interface = self.agent.br_mgr.get_vxlan_device_name(
segment.segmentation_id)

for agent_ip, state in agent_ports.items():
if agent_ip == self.agent.br_mgr.local_ip:
continue

after = state.get('after')
for mac, ip in after:
self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface)

before = state.get('before')
for mac, ip in before:
self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface)

def fdb_update(self, context, fdb_entries):
LOG.debug(_("fdb_update received"))
for action, values in fdb_entries.items():
method = '_fdb_' + action
if not hasattr(self, method):
raise NotImplementedError()

getattr(self, method)(context, values)

def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
Expand Down
117 changes: 72 additions & 45 deletions neutron/plugins/ml2/drivers/l2pop/mech_driver.py
Expand Up @@ -36,6 +36,7 @@ class L2populationMechanismDriver(api.MechanismDriver,

def initialize(self):
LOG.debug(_("Experimental L2 population driver"))
self.rpc_ctx = n_context.get_admin_context_without_session()

def _get_port_fdb_entries(self, port):
return [[port['mac_address'],
Expand All @@ -45,31 +46,64 @@ def delete_port_precommit(self, context):
self.remove_fdb_entries = self._update_port_down(context)

def delete_port_postcommit(self, context):
self._notify_remove_fdb_entries(context,
self.remove_fdb_entries)

def _notify_remove_fdb_entries(self, context, fdb_entries):
rpc_ctx = n_context.get_admin_context_without_session()
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
rpc_ctx, fdb_entries)
self.rpc_ctx, self.remove_fdb_entries)

def _get_diff_ips(self, orig, port):
orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
port_ips = set([ip['ip_address'] for ip in port['fixed_ips']])

# check if an ip has been added or removed
orig_chg_ips = orig_ips.difference(port_ips)
port_chg_ips = port_ips.difference(orig_ips)

if orig_chg_ips or port_chg_ips:
return orig_chg_ips, port_chg_ips

def _fixed_ips_changed(self, context, orig, port):
diff_ips = self._get_diff_ips(orig, port)
if not diff_ips:
return
orig_ips, port_ips = diff_ips

port_infos = self._get_port_infos(context, orig)
if not port_infos:
return
agent, agent_ip, segment, port_fdb_entries = port_infos

orig_mac_ip = [[port['mac_address'], ip] for ip in orig_ips]
port_mac_ip = [[port['mac_address'], ip] for ip in port_ips]

upd_fdb_entries = {port['network_id']: {agent_ip: {}}}

ports = upd_fdb_entries[port['network_id']][agent_ip]
if orig_mac_ip:
ports['before'] = orig_mac_ip

if port_mac_ip:
ports['after'] = port_mac_ip

l2pop_rpc.L2populationAgentNotify.update_fdb_entries(
self.rpc_ctx, {'chg_ip': upd_fdb_entries})

return True

def update_port_postcommit(self, context):
port = context.current
orig = context.original

if port['status'] == orig['status']:
return
self._fixed_ips_changed(context, orig, port)

if port['status'] == const.PORT_STATUS_ACTIVE:
self._update_port_up(context)
elif port['status'] == const.PORT_STATUS_DOWN:
fdb_entries = self._update_port_down(context)
self._notify_remove_fdb_entries(context, fdb_entries)
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
self.rpc_ctx, fdb_entries)

def _update_port_up(self, context):
port_context = context.current
network_id = port_context['network_id']
agent_host = port_context['binding:host_id']
def _get_port_infos(self, context, port):
agent_host = port['binding:host_id']
if not agent_host:
return

Expand All @@ -80,26 +114,39 @@ def _update_port_up(self, context):

agent_ip = self.get_agent_ip(agent)
if not agent_ip:
LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"),
agent_host)
LOG.warning(_("Unable to retrieve the agent ip, check the agent "
"configuration."))
return

segment = context.bound_segment
if not segment:
LOG.warning(_("Port %(port)s updated by agent %(agent)s "
"isn't bound to any segment"),
{'port': port_context['id'], 'agent': agent.host})
{'port': port['id'], 'agent': agent})
return

tunnel_types = self.get_agent_tunnel_types(agent)
if segment['network_type'] not in tunnel_types:
return

fdb_entries = self._get_port_fdb_entries(port)

return agent, agent_ip, segment, fdb_entries

def _update_port_up(self, context):
port_context = context.current
port_infos = self._get_port_infos(context, port_context)
if not port_infos:
return
agent, agent_ip, segment, port_fdb_entries = port_infos

agent_host = port_context['binding:host_id']
network_id = port_context['network_id']

session = db_api.get_session()
agent_ports = self.get_agent_network_port_count(session, agent_host,
network_id)

rpc_ctx = n_context.get_admin_context_without_session()

other_fdb_entries = {network_id:
{'segment_id': segment['segmentation_id'],
'network_type': segment['network_type'],
Expand Down Expand Up @@ -138,45 +185,25 @@ def _update_port_up(self, context):

if ports.keys():
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(
rpc_ctx, agent_fdb_entries, agent_host)
self.rpc_ctx, agent_fdb_entries, agent_host)

# Notify other agents to add fdb rule for current port
fdb_entries = self._get_port_fdb_entries(port_context)
other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries

l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx,
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
other_fdb_entries)

def _update_port_down(self, context):
port_context = context.current
network_id = port_context['network_id']
port_infos = self._get_port_infos(context, port_context)
if not port_infos:
return
agent, agent_ip, segment, port_fdb_entries = port_infos

agent_host = port_context['binding:host_id']
if not agent_host:
return
network_id = port_context['network_id']

session = db_api.get_session()
agent = self.get_agent_by_host(session, agent_host)
if not agent:
return

agent_ip = self.get_agent_ip(agent)
if not agent_ip:
LOG.warning(_("Unable to retrieve the agent ip, check the agent "
"configuration."))
return

segment = context.bound_segment
if not segment:
LOG.warning(_("Port %(port)s updated by agent %(agent)s "
"isn't bound to any segment"),
{'port': port_context['id'], 'agent': agent})
return

tunnel_types = self.get_agent_tunnel_types(agent)
if segment['network_type'] not in tunnel_types:
return

agent_ports = self.get_agent_network_port_count(session, agent_host,
network_id)

Expand Down
9 changes: 9 additions & 0 deletions neutron/plugins/ml2/drivers/l2pop/rpc.py
Expand Up @@ -76,4 +76,13 @@ def remove_fdb_entries(self, context, fdb_entries, host=None):
self._notification_fanout(context, 'remove_fdb_entries',
fdb_entries)

def update_fdb_entries(self, context, fdb_entries, host=None):
if fdb_entries:
if host:
self._notification_host(context, 'update_fdb_entries',
fdb_entries, host)
else:
self._notification_fanout(context, 'update_fdb_entries',
fdb_entries)

L2populationAgentNotify = L2populationAgentNotifyAPI()
9 changes: 9 additions & 0 deletions neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
Expand Up @@ -415,6 +415,15 @@ def _del_fdb_flow(self, port_info, agent_ip, lvm, ofport):
dl_vlan=lvm.vlan,
dl_dst=port_info[0])

def fdb_update(self, context, fdb_entries):
LOG.debug(_("fdb_update received"))
for action, values in fdb_entries.items():
method = '_fdb_' + action
if not hasattr(self, method):
raise NotImplementedError()

getattr(self, method)(context, values)

def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
Expand Down
23 changes: 23 additions & 0 deletions neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py
Expand Up @@ -879,3 +879,26 @@ def test_fdb_remove(self):
check_exit_code=False),
]
execute_fn.assert_has_calls(expected)

def test_fdb_update_chg_ip(self):
fdb_entries = {'chg_ip':
{'net_id':
{'agent_ip':
{'before': [['port_mac', 'port_ip_1']],
'after': [['port_mac', 'port_ip_2']]}}}}

with mock.patch.object(utils, 'execute',
return_value='') as execute_fn:
self.lb_rpc.fdb_update(None, fdb_entries)

expected = [
mock.call(['ip', 'neigh', 'add', 'port_ip_2', 'lladdr',
'port_mac', 'dev', 'vxlan-1', 'nud', 'permanent'],
root_helper=self.root_helper,
check_exit_code=False),
mock.call(['ip', 'neigh', 'del', 'port_ip_1', 'lladdr',
'port_mac', 'dev', 'vxlan-1'],
root_helper=self.root_helper,
check_exit_code=False)
]
execute_fn.assert_has_calls(expected)
77 changes: 77 additions & 0 deletions neutron/tests/unit/ml2/drivers/test_l2population.py
Expand Up @@ -406,3 +406,80 @@ def test_fdb_remove_called_last_port(self):

self.mock_fanout.assert_any_call(
mock.ANY, expected, topic=self.fanout_topic)

def test_fixed_ips_changed(self):
self._register_ml2_agents()

with self.subnet(network=self._network) as subnet:
host_arg = {portbindings.HOST_ID: HOST}
with self.port(subnet=subnet, cidr='10.0.0.0/24',
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']

self.mock_fanout.reset_mock()

data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
{'ip_address': '10.0.0.10'}]}}
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)

add_expected = {'args':
{'fdb_entries':
{'chg_ip':
{p1['network_id']:
{'20.0.0.1':
{'after': [[p1['mac_address'],
'10.0.0.10']]}}}}},
'namespace': None,
'method': 'update_fdb_entries'}

self.mock_fanout.assert_any_call(
mock.ANY, add_expected, topic=self.fanout_topic)

self.mock_fanout.reset_mock()

data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
{'ip_address': '10.0.0.16'}]}}
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)

upd_expected = {'args':
{'fdb_entries':
{'chg_ip':
{p1['network_id']:
{'20.0.0.1':
{'before': [[p1['mac_address'],
'10.0.0.10']],
'after': [[p1['mac_address'],
'10.0.0.16']]}}}}},
'namespace': None,
'method': 'update_fdb_entries'}

self.mock_fanout.assert_any_call(
mock.ANY, upd_expected, topic=self.fanout_topic)

self.mock_fanout.reset_mock()

data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.16'}]}}
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)

del_expected = {'args':
{'fdb_entries':
{'chg_ip':
{p1['network_id']:
{'20.0.0.1':
{'before': [[p1['mac_address'],
'10.0.0.2']]}}}}},
'namespace': None,
'method': 'update_fdb_entries'}

self.mock_fanout.assert_any_call(
mock.ANY, del_expected, topic=self.fanout_topic)

0 comments on commit 3375f66

Please sign in to comment.