Skip to content

Commit

Permalink
Do global CIDR check if overlapping IPs disabled.
Browse files Browse the repository at this point in the history
Fix bug 1055822

This patch adds a global configuration option for enabling or disabling
overlapping IPs for subnets in different networks.
If they are disabled, the validation of the CIDR against overlapping
ones should be performed globally and not just among subnets defined for
the current network.

Change-Id: If6a562324f0a5c3982591be8030c4628ec9007b6
  • Loading branch information
salv-orlando committed Sep 25, 2012
1 parent 6194ab2 commit 95153e4
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 22 deletions.
3 changes: 3 additions & 0 deletions etc/quantum.conf
Expand Up @@ -44,6 +44,9 @@ api_paste_config = api-paste.ini

# Enable or disable bulk create/update/delete operations
# allow_bulk = True
# Enable or disable overlapping IPs for subnets
# allow_overlapping_ips = False

# RPC configuration options. Defined in rpc __init__
# The messaging module to use, defaults to kombu.
# rpc_backend = quantum.openstack.common.rpc.impl_kombu
Expand Down
1 change: 1 addition & 0 deletions quantum/common/config.py
Expand Up @@ -50,6 +50,7 @@
cfg.IntOpt('max_subnet_host_routes', default=20),
cfg.StrOpt('state_path', default='.'),
cfg.IntOpt('dhcp_lease_duration', default=120),
cfg.BoolOpt('allow_overlapping_ips', default=False)
]

# Register the configuration options
Expand Down
48 changes: 26 additions & 22 deletions quantum/db/db_base_plugin_v2.py
Expand Up @@ -181,18 +181,17 @@ def _get_dns_by_subnet(self, context, subnet_id):
return []

def _get_route_by_subnet(self, context, subnet_id):
try:
route_qry = context.session.query(models_v2.Route)
return route_qry.filter_by(subnet_id=subnet_id).all()
except exc.NoResultFound:
return []
route_qry = context.session.query(models_v2.Route)
return route_qry.filter_by(subnet_id=subnet_id).all()

def _get_subnets_by_network(self, context, network_id):
try:
subnet_qry = context.session.query(models_v2.Subnet)
return subnet_qry.filter_by(network_id=network_id).all()
except exc.NoResultFound:
return []
subnet_qry = context.session.query(models_v2.Subnet)
return subnet_qry.filter_by(network_id=network_id).all()

def _get_all_subnets(self, context):
# NOTE(salvatore-orlando): This query might end up putting
# a lot of stress on the db. Consider adding a cache layer
return context.session.query(models_v2.Subnet).all()

def _fields(self, resource, fields):
if fields:
Expand Down Expand Up @@ -634,22 +633,27 @@ def _allocate_ips_for_port(self, context, network, port):
'subnet_id': result['subnet_id']})
return ips

def _validate_subnet_cidr(self, network, new_subnet_cidr):
def _validate_subnet_cidr(self, context, network, new_subnet_cidr):
"""Validate the CIDR for a subnet.
Verifies the specified CIDR does not overlap with the ones defined
for the other subnets specified for this network.
for the other subnets specified for this network, or with any other
CIDR if overlapping IPs are disabled.
"""
for subnet in network.subnets:
if (netaddr.IPSet([subnet.cidr]) &
netaddr.IPSet([new_subnet_cidr])):
err_msg = ("Requested subnet with cidr: %s "
"for network: %s "
"overlaps with subnet: %s)" % (new_subnet_cidr,
network.id,
subnet.cidr))
LOG.error(err_msg)
new_subnet_ipset = netaddr.IPSet([new_subnet_cidr])
subnet_list = network.subnets
if not cfg.CONF.allow_overlapping_ips:
subnet_list = self._get_all_subnets(context)
for subnet in subnet_list:
if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset):
# don't give out details of the overlapping subnet
err_msg = _("Requested subnet with cidr: %s "
"for network: %s overlaps with another subnet" %
(new_subnet_cidr, network.id))
LOG.error("Validation for CIDR:%s failed - overlaps with "
"subnet %s (CIDR:%s)",
new_subnet_cidr, subnet.id, subnet.cidr)
raise q_exc.InvalidInput(error_message=err_msg)

def _validate_allocation_pools(self, ip_pools, gateway_ip, subnet_cidr):
Expand Down Expand Up @@ -987,7 +991,7 @@ def create_subnet(self, context, subnet):
tenant_id = self._get_tenant_id_for_create(context, s)
with context.session.begin(subtransactions=True):
network = self._get_network(context, s["network_id"])
self._validate_subnet_cidr(network, s['cidr'])
self._validate_subnet_cidr(context, network, s['cidr'])
# The 'shared' attribute for subnets is for internal plugin
# use only. It is not exposed through the API
subnet = models_v2.Subnet(tenant_id=tenant_id,
Expand Down
2 changes: 2 additions & 0 deletions quantum/tests/unit/metaplugin/test_metaplugin.py
Expand Up @@ -227,6 +227,8 @@ def test_create_delete_port(self):
self.plugin.delete_network(self.context, network_ret3['id'])

def test_create_delete_subnet(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
network1 = self._fake_network('fake1')
network_ret1 = self.plugin.create_network(self.context, network1)
network2 = self._fake_network('fake2')
Expand Down
24 changes: 24 additions & 0 deletions quantum/tests/unit/test_db_plugin.py
Expand Up @@ -712,6 +712,8 @@ def side_effect(*args, **kwargs):
self._validate_behavior_on_bulk_failure(res, 'ports')

def test_list_ports(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(), self.port()) as (port1, port2):
req = self.new_list_request('ports', 'json')
port_list = self.deserialize('json', req.get_response(self.api))
Expand All @@ -721,6 +723,8 @@ def test_list_ports(self):
self.assertTrue(port2['port']['id'] in ids)

def test_list_ports_filtered_by_fixed_ip(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(), self.port()) as (port1, port2):
fixed_ips = port1['port']['fixed_ips'][0]
query_params = """
Expand Down Expand Up @@ -1749,6 +1753,26 @@ def test_create_two_subnets_same_cidr_returns_400(self):
pass
self.assertEquals(ctx_manager.exception.code, 400)

def test_create_2_subnets_overlapping_cidr_allowed_returns_200(self):
cidr_1 = '10.0.0.0/23'
cidr_2 = '10.0.0.0/24'
cfg.CONF.set_override('allow_overlapping_ips', True)

with contextlib.nested(self.subnet(cidr=cidr_1),
self.subnet(cidr=cidr_2)):
pass

def test_create_2_subnets_overlapping_cidr_not_allowed_returns_400(self):
cidr_1 = '10.0.0.0/23'
cidr_2 = '10.0.0.0/24'
cfg.CONF.set_override('allow_overlapping_ips', False)
with self.assertRaises(
webob.exc.HTTPClientError) as ctx_manager:
with contextlib.nested(self.subnet(cidr=cidr_1),
self.subnet(cidr=cidr_2)):
pass
self.assertEquals(ctx_manager.exception.code, 400)

def test_create_subnets_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
Expand Down
2 changes: 2 additions & 0 deletions quantum/tests/unit/test_l3_plugin.py
Expand Up @@ -287,6 +287,8 @@ def _create_network(self, fmt, name, admin_status_up, **kwargs):
def setUp(self):
test_config['plugin_name_v2'] = (
'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
ext_mgr = L3TestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(L3NatDBTestCase, self).setUp()
Expand Down

0 comments on commit 95153e4

Please sign in to comment.