From 5dd91cd4aaa2e7cd8dde1dd316d53cab25ef9b78 Mon Sep 17 00:00:00 2001 From: kaiyou Date: Mon, 5 Jun 2017 17:51:37 +0200 Subject: [PATCH 1/2] Rewrite the split_port function using re In the case of a defined format with specific parts, a regular expression with named capturing bits make reasoning about the parts simpler than imlementing a parser from scratch. Signed-off-by: kaiyou --- docker/utils/ports.py | 107 ++++++++++++++++-------------------------- 1 file changed, 40 insertions(+), 67 deletions(-) diff --git a/docker/utils/ports.py b/docker/utils/ports.py index 3708958d4e..57332deee4 100644 --- a/docker/utils/ports.py +++ b/docker/utils/ports.py @@ -1,3 +1,16 @@ +import re + +PORT_SPEC = re.compile( + "^" # Match full string + "(" # External part + "((?P[a-fA-F\d.:]+):)?" # Address + "(?P[\d]*)(-(?P[\d]+))?:" # External range + ")?" + "(?P[\d]+)(-(?P[\d]+))?" # Internal range + "(?P/(udp|tcp))?" # Protocol + "$" # Match full string +) + def add_port_mapping(port_bindings, internal_port, external): if internal_port in port_bindings: @@ -24,81 +37,41 @@ def build_port_bindings(ports): return port_bindings -def to_port_range(port, randomly_available_port=False): - if not port: - return None - - protocol = "" - if "/" in port: - parts = port.split("/") - if len(parts) != 2: - _raise_invalid_port(port) - - port, protocol = parts - protocol = "/" + protocol - - if randomly_available_port: - return ["%s%s" % (port, protocol)] - - parts = str(port).split('-') - - if len(parts) == 1: - return ["%s%s" % (port, protocol)] - - if len(parts) == 2: - full_port_range = range(int(parts[0]), int(parts[1]) + 1) - return ["%s%s" % (p, protocol) for p in full_port_range] - - raise ValueError('Invalid port range "%s", should be ' - 'port or startport-endport' % port) - - def _raise_invalid_port(port): raise ValueError('Invalid port "%s", should be ' '[[remote_ip:]remote_port[-remote_port]:]' 'port[/protocol]' % port) -def split_port(port): - parts = str(port).split(':') - - if not 1 <= len(parts) <= 3: - _raise_invalid_port(port) - - if len(parts) == 1: - internal_port, = parts - if not internal_port: - _raise_invalid_port(port) - return to_port_range(internal_port), None - if len(parts) == 2: - external_port, internal_port = parts - - internal_range = to_port_range(internal_port) - if internal_range is None: - _raise_invalid_port(port) - - external_range = to_port_range(external_port, len(internal_range) == 1) - if external_range is None: - _raise_invalid_port(port) - - if len(internal_range) != len(external_range): - raise ValueError('Port ranges don\'t match in length') - - return internal_range, external_range +def port_range(start, end, proto, randomly_available_port=False): + if not start: + return start + if not end: + return [start + proto] + if randomly_available_port: + return ['{}-{}'.format(start, end) + proto] + return [str(port) + proto for port in range(int(start), int(end) + 1)] - external_ip, external_port, internal_port = parts - if not internal_port: +def split_port(port): + match = PORT_SPEC.match(port) + if match is None: _raise_invalid_port(port) + parts = match.groupdict() - internal_range = to_port_range(internal_port) - external_range = to_port_range(external_port, len(internal_range) == 1) - - if not external_range: - external_range = [None] * len(internal_range) - - if len(internal_range) != len(external_range): - raise ValueError('Port ranges don\'t match in length') + host = parts['host'] + proto = parts['proto'] or '' + internal = port_range(parts['int'], parts['int_end'], proto) + external = port_range( + parts['ext'], parts['ext_end'], '', len(internal) == 1) - return internal_range, [(external_ip, ex_port or None) - for ex_port in external_range] + if host is None: + if external is not None and len(internal) != len(external): + raise ValueError('Port ranges don\'t match in length') + return internal, external + else: + if not external: + external = [None] * len(internal) + elif len(internal) != len(external): + raise ValueError('Port ranges don\'t match in length') + return internal, [(host, ext_port) for ext_port in external] From 0c1271350db33cb21265309a31da2d1c399b8243 Mon Sep 17 00:00:00 2001 From: kaiyou Date: Mon, 5 Jun 2017 17:57:46 +0200 Subject: [PATCH 2/2] Add a specific unit test for splitting port with IPv6 The test was copied from https://github.com/greybyte/docker-py/commit/ccec87ca2c2aacfcfe3b38c5bc7d59dd73551c51 Signed-off-by: kaiyou --- tests/unit/utils_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 25ed0f9b7f..c25881d142 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -552,6 +552,12 @@ def test_split_port_range_with_protocol(self): self.assertEqual(external_port, [("127.0.0.1", "1000"), ("127.0.0.1", "1001")]) + def test_split_port_with_ipv6_address(self): + internal_port, external_port = split_port( + "2001:abcd:ef00::2:1000:2000") + self.assertEqual(internal_port, ["2000"]) + self.assertEqual(external_port, [("2001:abcd:ef00::2", "1000")]) + def test_split_port_invalid(self): self.assertRaises(ValueError, lambda: split_port("0.0.0.0:1000:2000:tcp"))