Skip to content

Commit

Permalink
validate and recommend the cidr
Browse files Browse the repository at this point in the history
Bug #1195974

It is hard to validate the CIDR typed in by user,
the simple way is to recognize only one and recommend it
if user's input is not the one.

Change-Id: Ic8defe30a43a5ae69c3f737094f866b36bb68f59
  • Loading branch information
gongysh committed Jul 7, 2013
1 parent 31c7bda commit 53a66b2
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 36 deletions.
17 changes: 11 additions & 6 deletions neutron/api/v2/attributes.py
Expand Up @@ -244,15 +244,20 @@ def _validate_ip_address_or_none(data, valid_values=None):


def _validate_subnet(data, valid_values=None):
msg = None
try:
netaddr.IPNetwork(_validate_no_whitespace(data))
if len(data.split('/')) == 2:
net = netaddr.IPNetwork(_validate_no_whitespace(data))
cidr = str(net.cidr)
if (cidr != data):
msg = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": data,
"cidr": cidr}
else:
return
except Exception:
pass

msg = _("'%s' is not a valid IP subnet") % data
LOG.debug(msg)
msg = _("'%s' is not a valid IP subnet") % data
if msg:
LOG.debug(msg)
return msg


Expand Down
25 changes: 20 additions & 5 deletions neutron/tests/unit/test_attributes.py
Expand Up @@ -266,7 +266,7 @@ def test_validate_hostroutes(self):
[{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'},
{'nexthop': '10.0.2.20',
'destination': '100.0.0.1/8'}]]
'destination': '101.0.0.0/8'}]]
for host_routes in hostroute_pools:
msg = attributes._validate_hostroutes(host_routes, None)
self.assertIsNone(msg)
Expand Down Expand Up @@ -371,7 +371,7 @@ def test_validate_subnet(self):
self.assertIsNone(msg)

# Valid - IPv6 with final octets
cidr = "fe80::0/24"
cidr = "fe80::/24"
msg = attributes._validate_subnet(cidr,
None)
self.assertIsNone(msg)
Expand All @@ -380,21 +380,36 @@ def test_validate_subnet(self):
cidr = "10.0.2.0"
msg = attributes._validate_subnet(cidr,
None)
error = "'%s' is not a valid IP subnet" % cidr
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "10.0.2.0/32"}
self.assertEqual(msg, error)

# Invalid - IPv4 with final octets
cidr = "192.168.1.1/24"
msg = attributes._validate_subnet(cidr,
None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "192.168.1.0/24"}
self.assertEqual(msg, error)

# Invalid - IPv6 without final octets, missing mask
cidr = "fe80::"
msg = attributes._validate_subnet(cidr,
None)
error = "'%s' is not a valid IP subnet" % cidr
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
self.assertEqual(msg, error)

# Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0"
msg = attributes._validate_subnet(cidr,
None)
error = "'%s' is not a valid IP subnet" % cidr
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
self.assertEqual(msg, error)

# Invalid - Address format error
Expand Down
35 changes: 13 additions & 22 deletions neutron/tests/unit/test_db_plugin.py
Expand Up @@ -1347,7 +1347,7 @@ def test_requested_subnet_id_v4_and_v6(self):
res = self._create_subnet(self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::0/124',
cidr='2607:f0d0:1002:51::/124',
ip_version=6,
gateway_ip=ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(self.fmt, res)
Expand Down Expand Up @@ -1543,26 +1543,6 @@ def test_fixed_ip_invalid_ip(self):
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 400)

def test_fixed_ip_valid_ip_non_root_cidr(self):
with self.subnet(cidr='10.0.0.254/24') as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 201)
port = self.deserialize(self.fmt, res)
self._delete('ports', port['port']['id'])

def test_fixed_ip_invalid_ip_non_root_cidr(self):
with self.subnet(cidr='10.0.0.254/24') as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.1.2'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 400)

def test_requested_ips_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
Expand Down Expand Up @@ -2555,6 +2535,17 @@ def test_create_subnet_bad_tenant(self):
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, 403)

def test_create_subnet_bad_cidr(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.5/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id']}}

subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

def test_create_subnet_bad_ip_version(self):
with self.network() as network:
# Check bad IP version
Expand Down Expand Up @@ -2800,7 +2791,7 @@ def test_create_subnet_with_none_gateway_allocation_pool(self):

def test_create_subnet_with_v6_allocation_pool(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::0/80'
cidr = 'fe80::/80'
allocation_pools = [{'start': 'fe80::2',
'end': 'fe80::ffff:fffa:ffff'}]
self._test_create_subnet(gateway_ip=gateway_ip,
Expand Down
2 changes: 1 addition & 1 deletion neutron/tests/unit/test_iptables_firewall.py
Expand Up @@ -27,7 +27,7 @@

_uuid = test_api_v2._uuid
FAKE_PREFIX = {'IPv4': '10.0.0.0/24',
'IPv6': 'fe80::0/48'}
'IPv6': 'fe80::/48'}
FAKE_IP = {'IPv4': '10.0.0.1',
'IPv6': 'fe80::1'}

Expand Down
4 changes: 2 additions & 2 deletions neutron/tests/unit/test_l3_plugin.py
Expand Up @@ -1149,7 +1149,7 @@ def test_floatingip_crd_ops(self):
expected_code=exc.HTTPNotFound.code)

def _test_floatingip_with_assoc_fails(self, plugin_class):
with self.subnet(cidr='200.0.0.1/24') as public_sub:
with self.subnet(cidr='200.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port:
with self.router() as r:
Expand Down Expand Up @@ -1187,7 +1187,7 @@ def test_floatingip_with_assoc_fails(self):
'neutron.db.l3_db.L3_NAT_db_mixin')

def _test_floatingip_with_ip_generation_failure(self, plugin_class):
with self.subnet(cidr='200.0.0.1/24') as public_sub:
with self.subnet(cidr='200.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port:
with self.router() as r:
Expand Down

0 comments on commit 53a66b2

Please sign in to comment.