Skip to content

Commit

Permalink
dev: Using ipaddress instead of ipcalc in Python3
Browse files Browse the repository at this point in the history
  • Loading branch information
liangxin1300 committed Jun 28, 2018
1 parent a6a1848 commit 0d645c3
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 171 deletions.
6 changes: 3 additions & 3 deletions crmsh/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ def valid_adminIP(addr, prev_value=None):
warn(" Address already in use: {}".format(addr))
return False
for net in all_:
if addr in utils.Network(net):
if utils.ip_in_network(addr, net):
return True
warn(" Address '{}' invalid, expected one of {}".format(addr, all_))
return False
Expand Down Expand Up @@ -919,7 +919,7 @@ def pick_default_value(vlist, ilist):
all_ = utils.network_v6_all()
for item in all_.values():
network_list.extend(item)
default_networks = map(utils.get_ipv6_network, network_list)
default_networks = [utils.get_ipv6_network(x) for x in network_list]
else:
default_networks = utils.network_all()
if not default_networks:
Expand Down Expand Up @@ -1721,7 +1721,7 @@ def update_nodeid(nodeid, node=None):
tmp = re.findall(r' {}/[0-9]+ '.format(ringXaddr), outp, re.M)[0].strip()
peer_ip = corosync.get_value("nodelist.node.ring{}_addr".format(i))
# peer ring0_addr and local ring0_addr must be configured in the same network
if peer_ip not in utils.Network(tmp):
if not utils.ip_in_network(peer_ip, tmp):
print(term.render(clidisplay.error(" Peer IP {} is not in the same network: {}".format(peer_ip, tmp))))
continue

Expand Down
171 changes: 9 additions & 162 deletions crmsh/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import bz2
import fnmatch
import gc
import ipaddress
from contextlib import contextmanager
from . import config
from . import userdir
Expand Down Expand Up @@ -2037,177 +2038,23 @@ def valid_ip_addr(addr, version=4):


def get_ipv6_network(addr_with_mask):
return Network(addr_with_mask).network().to_compressed()
return str(ipaddress.ip_interface(addr_with_mask).network.network_address)


def gen_nodeid_from_ipv6(addr):
return IP(addr).ip_long() % 1000000000
return int(ipaddress.ip_address(addr)) % 1000000000


MAX_IPV6 = (1 << 128) - 1
MAX_IPV4 = (1 << 32) - 1
def ip_in_network(addr, net):
return ipaddress.ip_address(addr) in ipaddress.ip_interface(net).network


class IP(object):
"""
learn from https://github.com/tehmaze/ipcalc
"""
def __init__(self, ip, mask=None, version=0):
"""Initialize a new IPv4 or IPv6 address."""
self.mask = mask
self.v = 0

if isinstance(ip, int):
self.ip = int(ip)
if self.ip <= MAX_IPV4:
self.v = version or 4
self.dq = self._itodq(ip)
else:
self.v = version or 6
self.dq = self._itodq(ip)
else:
if '/' in ip:
ip, mask = ip.split('/', 1)
self.mask = int(mask)
self.v = version or 0
self.dq = ip
self.ip = self._dqtoi(ip)

if self.mask is None:
self.mask = {4: 32, 6: 128}[self.v]
elif isinstance(self.mask, str):
self.mask = int(self.mask)

def __str__(self):
return self.dq

def _dqtoi(self, dq):
"""Convert dotquad or hextet to long."""
if ':' in dq:
if not valid_ip_addr(dq, 6):
raise ValueError("Invalid IPv6 address")
return self._dqtoi_ipv6(dq)
if '.' in dq:
if not valid_ip_addr(dq):
raise ValueError("Invalid IPv4 address")
return self._dqtoi_ipv4(dq)

raise ValueError("Invalid address input")

def _dqtoi_ipv4(self, dq):
q = dq.split('.')
q.reverse()
self.v = 4
return sum(int(byte) << 8 * index for index, byte in enumerate(q))

def _dqtoi_ipv6(self, dq):
hx = dq.split(':')
if len(hx) < 8:
ix = hx.index('')
px = len(hx[ix + 1:])
for x in range(ix + px + 1, 8):
hx.insert(ix, '0')

ip = ''
hx = [x == '' and '0' or x for x in hx]
for h in hx:
if len(h) < 4:
h = '%04x' % int(h, 16)
ip += h
self.v = 6
return int(ip, 16)

def _itodq(self, n):
"""Convert long to dotquad or hextet."""
if self.v == 4:
return '.'.join(map(str, [
(n >> 24) & 0xff,
(n >> 16) & 0xff,
(n >> 8) & 0xff,
n & 0xff,
]))
n = '%032x' % n
return ':'.join(n[4 * x:4 * x + 4] for x in range(0, 8))
class IP:
def __init__(self, addr):
self.addr = ipaddress.ip_address(addr)

def version(self):
return self.v

def ip_long(self):
return self.ip

def to_compressed(self):
"""
Compress an IP address to its shortest possible compressed form.
"""
if self.v == 6:
quads = ['%x' % (int(q, 16)) for q in self.dq.split(':')]
quadc = ':%s:' % (':'.join(quads),)
zeros = [0, -1]

# Find the largest group of zeros
for match in re.finditer(r'(:[:0]+)', quadc):
count = len(match.group(1)) - 1
if count > zeros[0]:
zeros = [count, match.start(1)]

count, where = zeros
if count:
quadc = quadc[:where] + ':' + quadc[where + count:]

quadc = re.sub(r'((^:)|(:$))', '', quadc)
quadc = re.sub(r'((^:)|(:$))', '::', quadc)
return quadc


class Network(IP):
"""
learn from https://github.com/tehmaze/ipcalc
"""
def netmask_long(self):
"""
Network netmask derived from subnet size, as long.
"""
if self.version() == 4:
return (MAX_IPV4 >> (32 - self.mask)) << (32 - self.mask)
return (MAX_IPV6 >> (128 - self.mask)) << (128 - self.mask)

def network(self):
"""
Network address, as IP object.
"""
return IP(self.network_long(), version=self.version())

def network_long(self):
"""
Network address, as long.
"""
return self.ip & self.netmask_long()

def broadcast_long(self):
"""
Broadcast address, as long.
"""
if self.version() == 4:
return self.network_long() | (MAX_IPV4 - self.netmask_long())
return self.network_long() | (MAX_IPV6 - self.netmask_long())

def check_collision(self, other):
"""Check another network against the given network."""
other = Network(other)
return self.network_long() < other.network_long() < self.broadcast_long() or \
other.network_long() < self.network_long() < other.broadcast_long()

def __contains__(self, ip):
return self.check_collision(ip)

def has_key(self, ip):
"""
Check if the given ip is part of the network.
:param ip: the ip address
:type ip: :class:`IP` or str or long or int
"""
return self.__contains__(ip)
return self.addr.version


# vim:ts=4:sw=4:et:
10 changes: 4 additions & 6 deletions test/unittests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,10 @@ def test_network():
ip = utils.IP('2001:db3::1')
assert ip.version() == 6

net = utils.Network('192.0.2.0/24')
assert ('192.168.2.0' in net) is False
assert ('192.0.2.42' in net) is True
assert (utils.ip_in_network('192.168.2.0', '192.0.2.0/24') is False)
assert (utils.ip_in_network('192.0.2.42', '192.0.2.0/24') is True)

net = utils.Network('2001:db8::2/64')
assert ('2001:db3::1' in net) is False
assert ('2001:db8::1' in net) is True
assert (utils.ip_in_network('2001:db3::1', '2001:db8::2/64') is False)
assert (utils.ip_in_network('2001:db8::1', '2001:db8::2/64') is True)

assert utils.get_ipv6_network("2002:db8::2/64") == "2002:db8::"

0 comments on commit 0d645c3

Please sign in to comment.