Skip to content

Commit

Permalink
NO-JIRA: add SSL test that verifies hostname in certificate
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Mar 22, 2013
1 parent 94d4f28 commit 7d8f517
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
5 changes: 5 additions & 0 deletions qpid/messaging/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ def __init__(self, url=None, **options):
@param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format)
@type ssl_trustfile: str
@param ssl_trustfile: file trusted certificates to validate the server
@type ssl_skip_hostname_check: bool
@param ssl_skip_hostname_check: disable verification of hostname in
certificate. Use with caution - disabling hostname checking leaves you
vulnerable to Man-in-the-Middle attacks.
@rtype: Connection
@return: a disconnected Connection
Expand Down Expand Up @@ -170,6 +174,7 @@ def __init__(self, url=None, **options):
self.ssl_keyfile = options.get("ssl_keyfile", None)
self.ssl_certfile = options.get("ssl_certfile", None)
self.ssl_trustfile = options.get("ssl_trustfile", None)
self.ssl_skip_hostname_check = options.get("ssl_skip_hostname_check", False)
self.client_properties = options.get("client_properties", {})

self.options = options
Expand Down
71 changes: 69 additions & 2 deletions qpid/messaging/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def close(self):

try:
from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
SSL_ERROR_WANT_WRITE
SSL_ERROR_WANT_WRITE, CERT_REQUIRED, CERT_NONE
except ImportError:

## try the older python SSL api:
Expand All @@ -69,6 +69,15 @@ def __init__(self, conn, host, port):
ssl_certfile = conn.ssl_certfile
if ssl_certfile and not ssl_keyfile:
ssl_keyfile = ssl_certfile

# this version of SSL does NOT perform certificate validation. If the
# connection has been configured with CA certs (via ssl_trustfile), then
# the application expects the certificate to be validated against the
# supplied CA certs. Since this version cannot validate, the peer cannot
# be trusted.
if conn.ssl_trustfile:
raise SSLError("This version of Python does not support verification of the peer's certificate.")

self.ssl = ssl(self.socket, keyfile=ssl_keyfile, certfile=ssl_certfile)
self.socket.setblocking(1)

Expand All @@ -95,7 +104,39 @@ class tls(SocketTransport):

def __init__(self, conn, host, port):
SocketTransport.__init__(self, conn, host, port)
self.tls = wrap_socket(self.socket, keyfile=conn.ssl_keyfile, certfile=conn.ssl_certfile, ca_certs=conn.ssl_trustfile)
if conn.ssl_trustfile:
validate = CERT_REQUIRED
else:
validate = CERT_NONE

self.tls = wrap_socket(self.socket, keyfile=conn.ssl_keyfile,
certfile=conn.ssl_certfile,
ca_certs=conn.ssl_trustfile,
cert_reqs=validate)

if validate == CERT_REQUIRED and not conn.ssl_skip_hostname_check:
match_found = False
peer_cert = self.tls.getpeercert()
if peer_cert:
peer_names = []
if 'subjectAltName' in peer_cert:
for san in peer_cert['subjectAltName']:
if san[0] == 'DNS':
peer_names.append(san[1].lower())
if 'subject' in peer_cert:
for sub in peer_cert['subject']:
while isinstance(sub, tuple) and isinstance(sub[0],tuple):
sub = sub[0] # why the extra level of indirection???
if sub[0] == 'commonName':
peer_names.append(sub[1].lower())
for pattern in peer_names:
if _match_dns_pattern( host.lower(), pattern ):
#print "Match found %s" % pattern
match_found = True
break
if not match_found:
raise SSLError("Connection hostname '%s' does not match names from peer certificate: %s" % (host, peer_names))

self.socket.setblocking(0)
self.state = None

Expand Down Expand Up @@ -146,5 +187,31 @@ def close(self):
# this closes the underlying socket
self.tls.close()

def _match_dns_pattern( hostname, pattern ):
""" For checking the hostnames provided by the peer's certificate
"""
if pattern.find("*") == -1:
return hostname == pattern

# DNS wildcarded pattern - see RFC2818
h_labels = hostname.split(".")
p_labels = pattern.split(".")

while h_labels and p_labels:
if p_labels[0].find("*") == -1:
if p_labels[0] != h_labels[0]:
return False
else:
p = p_labels[0].split("*")
if not h_labels[0].startswith(p[0]):
return False
if not h_labels[0].endswith(p[1]):
return False
h_labels.pop(0)
p_labels.pop(0)

return not h_labels and not p_labels


TRANSPORTS["ssl"] = tls
TRANSPORTS["tcp+tls"] = tls

0 comments on commit 7d8f517

Please sign in to comment.