Skip to content

Commit

Permalink
Merge pull request #17 from campaignmonitor/better-ssl-cert-verification
Browse files Browse the repository at this point in the history
Better SSL Certificate Verification
  • Loading branch information
jdennes committed Jul 13, 2013
2 parents da6e1c1 + df7a771 commit d87de32
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 21 deletions.
24 changes: 3 additions & 21 deletions createsend/createsend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@
import platform
import urllib
import urllib2
import httplib
import base64
import gzip
import os
import socket, ssl
from StringIO import StringIO
from urlparse import urlparse
try:
import json
except ImportError:
import simplejson as json
from utils import json_to_py, get_faker
from utils import VerifiedHTTPSConnection, json_to_py, get_faker

__version_info__ = ('3', '2', '0')
__version__ = '.'.join(__version_info__)
Expand Down Expand Up @@ -157,24 +155,8 @@ def make_request(self, method, path, params={}, body="", username=None,
data = self.faker.open() if self.faker else ''
status = self.faker.status if (self.faker and self.faker.status) else 200
return self.handle_response(status, data)

if (parsed_base_uri.scheme == 'https'):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((parsed_base_uri.netloc, 443))
sslsock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_REQUIRED,
ca_certs=os.path.join(os.path.dirname(__file__), 'cacert.pem'))
cert = sslsock.getpeercert()

for field in cert['subject']:
if field[0][0] == 'commonName':
certhost = field[0][1]
certhost = certhost.replace('*.', '')
netloc = parsed_base_uri.netloc
requestdomain = '.'.join(netloc.split('.')[-2:])
if certhost != requestdomain:
raise ssl.SSLError("Host name '%s' doesn't match certificate host '%s'" % (requestdomain, certhost))

c = httplib.HTTPConnection(parsed_base_uri.netloc)

c = VerifiedHTTPSConnection(parsed_base_uri.netloc)
c.request(method, self.build_url(parsed_base_uri, path, params), body, headers)
response = c.getresponse()
if response.getheader('content-encoding', '') == 'gzip':
Expand Down
102 changes: 102 additions & 0 deletions createsend/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,111 @@
import os
import re
import httplib
import socket
import ssl
try:
import json
except ImportError:
import simplejson as json

class CertificateError(ValueError):
"""
Raised when an error occurs when attempting to verify an SSL certificate.
"""
pass

def _dnsname_to_pat(dn):
pats = []
for frag in dn.split(r'.'):
if frag == '*':
# When '*' is a fragment by itself, it matches a non-empty dotless
# fragment.
pats.append('[^.]+')
else:
# Otherwise, '*' matches any dotless fragment.
frag = re.escape(frag)
pats.append(frag.replace(r'\*', '[^.]*'))
return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)

def match_hostname(cert, hostname):
"""
This is a backport of the match_hostname() function from Python 3.2,
essential when using SSL.
Verifies that *cert* (in decoded format as returned by
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules
are mostly followed, but IP addresses are not accepted for *hostname*.
CertificateError is raised on failure. On success, the function
returns nothing.
"""
if not cert:
raise ValueError("empty or no certificate")
dnsnames = []
san = cert.get('subjectAltName', ())
for key, value in san:
if key == 'DNS':
if _dnsname_to_pat(value).match(hostname):
return
dnsnames.append(value)
if not san:
# The subject is only checked when subjectAltName is empty
for sub in cert.get('subject', ()):
for key, value in sub:
# XXX according to RFC 2818, the most specific Common Name
# must be used.
if key == 'commonName':
if _dnsname_to_pat(value).match(hostname):
return
dnsnames.append(value)
if len(dnsnames) > 1:
raise CertificateError("hostname %r "
"doesn't match either of %s"
% (hostname, ', '.join(map(repr, dnsnames))))
elif len(dnsnames) == 1:
raise CertificateError("hostname %r "
"doesn't match %r"
% (hostname, dnsnames[0]))
else:
raise CertificateError("no appropriate commonName or "
"subjectAltName fields were found")

class VerifiedHTTPSConnection(httplib.HTTPSConnection):
"""
A connection that includes SSL certificate verification.
"""
def connect(self):
self.connection_kwargs = {}
# for > py2.5
if hasattr(self, 'timeout'):
self.connection_kwargs.update(timeout = self.timeout)

# for >= py2.7
if hasattr(self, 'source_address'):
self.connection_kwargs.update(source_address = self.source_address)

sock = socket.create_connection((self.host, self.port), **self.connection_kwargs)

# for >= py2.7
if getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()

cert_path = os.path.join(os.path.dirname(__file__), 'cacert.pem')

self.sock = ssl.wrap_socket(
sock,
self.key_file,
self.cert_file,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=cert_path)

try:
match_hostname(self.sock.getpeercert(), self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise

def json_to_py(j):
o = json.loads(j)
if isinstance(o, dict):
Expand Down
16 changes: 16 additions & 0 deletions test/test_verifiedhttpsconnection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import unittest

from createsend import *
from createsend.utils import match_hostname

class VerifiedHTTPSConnectionTestCase(unittest.TestCase):

def setUp(self):
self.cs = CreateSend({'api_key': 'not an api key'})

def test_verified_connection_no_cert(self):
self.assertRaises(ValueError, match_hostname, None, 'api.createsend.com')

def test_verified_connection(self):
# An actual (non-stubbed) unauthenticated request to test verification.
self.assertRaises(Unauthorized, self.cs.clients)

0 comments on commit d87de32

Please sign in to comment.