Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 40 additions & 67 deletions docker/utils/ports.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
import re

PORT_SPEC = re.compile(
"^" # Match full string
"(" # External part
"((?P<host>[a-fA-F\d.:]+):)?" # Address
"(?P<ext>[\d]*)(-(?P<ext_end>[\d]+))?:" # External range
")?"
"(?P<int>[\d]+)(-(?P<int_end>[\d]+))?" # Internal range
"(?P<proto>/(udp|tcp))?" # Protocol
"$" # Match full string
)


def add_port_mapping(port_bindings, internal_port, external):
if internal_port in port_bindings:
Expand All @@ -24,81 +37,41 @@ def build_port_bindings(ports):
return port_bindings


def to_port_range(port, randomly_available_port=False):
if not port:
return None

protocol = ""
if "/" in port:
parts = port.split("/")
if len(parts) != 2:
_raise_invalid_port(port)

port, protocol = parts
protocol = "/" + protocol

if randomly_available_port:
return ["%s%s" % (port, protocol)]

parts = str(port).split('-')

if len(parts) == 1:
return ["%s%s" % (port, protocol)]

if len(parts) == 2:
full_port_range = range(int(parts[0]), int(parts[1]) + 1)
return ["%s%s" % (p, protocol) for p in full_port_range]

raise ValueError('Invalid port range "%s", should be '
'port or startport-endport' % port)


def _raise_invalid_port(port):
raise ValueError('Invalid port "%s", should be '
'[[remote_ip:]remote_port[-remote_port]:]'
'port[/protocol]' % port)


def split_port(port):
parts = str(port).split(':')

if not 1 <= len(parts) <= 3:
_raise_invalid_port(port)

if len(parts) == 1:
internal_port, = parts
if not internal_port:
_raise_invalid_port(port)
return to_port_range(internal_port), None
if len(parts) == 2:
external_port, internal_port = parts

internal_range = to_port_range(internal_port)
if internal_range is None:
_raise_invalid_port(port)

external_range = to_port_range(external_port, len(internal_range) == 1)
if external_range is None:
_raise_invalid_port(port)

if len(internal_range) != len(external_range):
raise ValueError('Port ranges don\'t match in length')

return internal_range, external_range
def port_range(start, end, proto, randomly_available_port=False):
if not start:
return start
if not end:
return [start + proto]
if randomly_available_port:
return ['{}-{}'.format(start, end) + proto]
return [str(port) + proto for port in range(int(start), int(end) + 1)]

external_ip, external_port, internal_port = parts

if not internal_port:
def split_port(port):
match = PORT_SPEC.match(port)
if match is None:
_raise_invalid_port(port)
parts = match.groupdict()

internal_range = to_port_range(internal_port)
external_range = to_port_range(external_port, len(internal_range) == 1)

if not external_range:
external_range = [None] * len(internal_range)

if len(internal_range) != len(external_range):
raise ValueError('Port ranges don\'t match in length')
host = parts['host']
proto = parts['proto'] or ''
internal = port_range(parts['int'], parts['int_end'], proto)
external = port_range(
parts['ext'], parts['ext_end'], '', len(internal) == 1)

return internal_range, [(external_ip, ex_port or None)
for ex_port in external_range]
if host is None:
if external is not None and len(internal) != len(external):
raise ValueError('Port ranges don\'t match in length')
return internal, external
else:
if not external:
external = [None] * len(internal)
elif len(internal) != len(external):
raise ValueError('Port ranges don\'t match in length')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: It feels like there should be a way to have this raise statement only once, but it's not immediately obvious to me what is the best way to get there.

return internal, [(host, ext_port) for ext_port in external]
6 changes: 6 additions & 0 deletions tests/unit/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,12 @@ def test_split_port_range_with_protocol(self):
self.assertEqual(external_port,
[("127.0.0.1", "1000"), ("127.0.0.1", "1001")])

def test_split_port_with_ipv6_address(self):
internal_port, external_port = split_port(
"2001:abcd:ef00::2:1000:2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, [("2001:abcd:ef00::2", "1000")])

def test_split_port_invalid(self):
self.assertRaises(ValueError,
lambda: split_port("0.0.0.0:1000:2000:tcp"))
Expand Down