Skip to content

Commit

Permalink
removed pycurl dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
paltman committed Mar 16, 2011
1 parent 770d774 commit 634f1f3
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 38 deletions.
60 changes: 60 additions & 0 deletions braintree/util/backports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""The match_hostname() function from Python 3.2, essential when using SSL."""

import re

class CertificateError(ValueError):
pass

def _dnsname_to_pat(dn):
pats = []
for frag in dn.split(r'.'):
if frag == '*':
# When '*' is a fragment by itself, it matches a non-empty dotless
# fragment.
pats.append('[^.]+')
else:
# Otherwise, '*' matches any dotless fragment.
frag = re.escape(frag)
pats.append(frag.replace(r'\*', '[^.]*'))
return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)

def match_hostname(cert, hostname):
"""Verify that *cert* (in decoded format as returned by
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules
are mostly followed, but IP addresses are not accepted for *hostname*.
CertificateError is raised on failure. On success, the function
returns nothing.
"""
if not cert:
raise ValueError("empty or no certificate")
dnsnames = []
san = cert.get('subjectAltName', ())

for key, value in san:
if key == 'DNS':
if _dnsname_to_pat(value).match(hostname):
return
dnsnames.append(value)

if not san:
# The subject is only checked when subjectAltName is empty
for sub in cert.get('subject', ()):
for key, value in sub:
# XXX according to RFC 2818, the most specific Common Name
# must be used.
if key == 'commonName':
if _dnsname_to_pat(value).match(hostname):
return
dnsnames.append(value)
if len(dnsnames) > 1:
raise CertificateError("hostname %r "
"doesn't match either of %s"
% (hostname, ', '.join(map(repr, dnsnames))))
elif len(dnsnames) == 1:
raise CertificateError("hostname %r "
"doesn't match %r"
% (hostname, dnsnames[0]))
else:
raise CertificateError("no appropriate commonName or "
"subjectAltName fields were found")
40 changes: 15 additions & 25 deletions braintree/util/http.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import httplib
import base64
import socket
import ssl
from braintree.configuration import Configuration
from braintree.util.backports import match_hostname
from braintree.util.xml_util import XmlUtil
from braintree.exceptions.authentication_error import AuthenticationError
from braintree.exceptions.authorization_error import AuthorizationError
Expand Down Expand Up @@ -64,7 +67,7 @@ def __http_do(self, http_verb, path, params=None):
)
response = conn.getresponse()
status = response.status

if Http.is_error_status(status):
conn.close()
Http.raise_exception_from_status(status)
Expand All @@ -90,27 +93,14 @@ def __headers(self):

def __verify_ssl(self):
if Configuration.use_unsafe_ssl: return

try:
import pycurl
except ImportError, e:
print "Cannot load PycURL. Please refer to Braintree documentation."
print """
If you are in an environment where you absolutely cannot load PycURL
(such as Google App Engine), you can turn off SSL Verification by setting:
Configuration.use_unsafe_ssl = True
This is highly discouraged, however, since it leaves you susceptible to
man-in-the-middle attacks."""
raise e

curl = pycurl.Curl()
# see http://curl.haxx.se/libcurl/c/curl_easy_setopt.html for info on these options
curl.setopt(pycurl.CAINFO, self.environment.ssl_certificate)
curl.setopt(pycurl.SSL_VERIFYPEER, 1)
curl.setopt(pycurl.SSL_VERIFYHOST, 2)
curl.setopt(pycurl.NOBODY, 1)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.URL, self.environment.protocol + self.environment.server_and_port)
curl.perform()

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.environment.server, self.environment.port))

sslsock = ssl.wrap_socket(
sock,
ssl_version=ssl.PROTOCOL_SSLv3,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=self.environment.ssl_certificate
)
match_hostname(sslsock.getpeercert(), self.environment.server)
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
url="http://www.braintreepaymentsolutions.com/gateway/python",
packages=["braintree", "braintree.exceptions", "braintree.util"],
package_data={"braintree": ["ssl/*"]},
install_requires=["pycurl==7.19.0"],
tests_require=["pycurl==7.19.0", "nose==0.11.3"]
install_requires=[],
tests_require=["nose==0.11.3"]
)
19 changes: 8 additions & 11 deletions tests/integration/test_http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from braintree.util.backports import CertificateError
from tests.test_helper import *
import pycurl


class TestHttp(unittest.TestCase):

Expand Down Expand Up @@ -28,28 +29,24 @@ def test_successful_connection_to_production(self):
pass

def test_unsuccessful_connection_to_good_ssl_server_with_wrong_cert(self):
environment = Environment(Environment.Sandbox.server, "443", True, Environment.Production.ssl_certificate)
environment = Environment("braintreegateway.com", "443", True, Environment.Production.ssl_certificate)
try:
config = Configuration(environment, "merchant_id", "public_key", "private_key")
http = config.http()
http.get("/")
self.assertTrue(False)
except pycurl.error, e:
error_code, error_msg = e
self.assertEquals(pycurl.E_SSL_CACERT, error_code)
self.assertTrue(re.search('verif(y|ication) failed', error_msg))

except CertificateError, e:
self.assertTrue(re.search("doesn't match", e.message))

def test_unsuccessful_connection_to_ssl_server_with_wrong_domain(self):
try:
environment = Environment("braintreegateway.com", "443", True, Environment.Production.ssl_certificate)
config = Configuration(environment, "merchant_id", "public_key", "private_key")
http = config.http()
http.get("/")
self.assertTrue(False)
except pycurl.error, e:
error_code, error_msg = e
self.assertEquals(pycurl.E_SSL_PEER_CERTIFICATE, error_code)
self.assertTrue(re.search("SSL: certificate subject name", error_msg))
except CertificateError, e:
self.assertTrue(re.search("doesn't match", e.message))

def test_unsafe_ssl_connection(self):
try:
Expand Down

1 comment on commit 634f1f3

@kennethreitz
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may use this as a simple way to add SSL verification to Requests. 🍰

Please sign in to comment.