Skip to content

Commit

Permalink
Finished implementation of zeep usage.
Browse files Browse the repository at this point in the history
  • Loading branch information
hbldh committed Mar 27, 2017
1 parent 3e8feba commit ac14853
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 83 deletions.
30 changes: 24 additions & 6 deletions bankid/certutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,34 @@ def split_certificate(certificate_path, destination_folder, password=None):
"""
try:
p = subprocess.Popen(["openssl", 'version'], stdout=subprocess.PIPE)
# Attempt Linux call first
p = subprocess.Popen(['openssl', 'version'], stdout=subprocess.PIPE)
sout, serr = p.communicate()
if not sout.decode().lower().startswith('openssl'):
raise NotImplementedError(
"OpenSSL executable could not be found. "
"Splitting cannot be performed.")
print(sout.strip())
openssl_executable = 'openssl'
except:
raise NotImplementedError(
"OpenSSL executable could not be found. "
"Splitting cannot be performed.")
try:
# Attempt to call on standard Git for Windows path.
p = subprocess.Popen(['C:\\Program Files\\Git\\mingw64\\bin\\openssl.exe', 'version'],
stdout=subprocess.PIPE)
sout, serr = p.communicate()
if not sout.decode().lower().startswith('openssl'):
raise NotImplementedError(
"OpenSSL executable could not be found. "
"Splitting cannot be performed.")
print(sout.strip())
openssl_executable = 'C:\\Program Files\\Git\\mingw64\\bin\\openssl.exe'
except:
raise NotImplementedError(
"OpenSSL executable could not be found. "
"Splitting cannot be performed.")

if not os.path.exists(os.path.abspath(os.path.expanduser(destination_folder))):
os.makedirs(os.path.abspath(os.path.expanduser(destination_folder)))

# Paths to output files.
out_cert_path = os.path.join(os.path.abspath(
Expand All @@ -89,7 +107,7 @@ def split_certificate(certificate_path, destination_folder, password=None):

# Use openssl for converting to pem format.
pipeline_1 = [
'openssl', 'pkcs12',
openssl_executable, 'pkcs12',
'-in', "{0}".format(certificate_path),
'-passin' if password is not None else '',
'pass:{0}'.format(password) if password is not None else '',
Expand All @@ -101,7 +119,7 @@ def split_certificate(certificate_path, destination_folder, password=None):
stderr=subprocess.PIPE)
p.communicate()
pipeline_2 = [
'openssl', 'pkcs12',
openssl_executable, 'pkcs12',
'-in', "{0}".format(certificate_path),
'-passin' if password is not None else '',
'pass:{0}'.format(password) if password is not None else '',
Expand Down
92 changes: 17 additions & 75 deletions bankid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@
import warnings
import six
import base64
import datetime

import requests
from suds.client import Client
from suds.transport.http import HttpAuthenticated
from suds.transport import Reply
from suds import WebFault
from suds.sax.text import Text
from zeep import Client
from zeep.transports import Transport
from zeep.exceptions import Error
from pkg_resources import resource_filename

from bankid.exceptions import get_error_class, BankIDWarning
Expand Down Expand Up @@ -60,11 +57,14 @@ def __init__(self, certificates, test_server=False):

headers = {
"Content-Type": "text/xml;charset=UTF-8",
"SOAPAction": ""
}
t = RequestsTransport(cert=self.certs, verify_cert=self.verify_cert)
self.client = Client(self.wsdl_url, location=self.api_url,
headers=headers, transport=t)

session = requests.Session()
session.verify = self.verify_cert
session.cert = self.certs
session.headers = headers
transport = Transport(session=session)
self.client = Client(self.wsdl_url, transport=transport)

def authenticate(self, personal_number, **kwargs):
"""Request an authentication order. The :py:meth:`collect` method
Expand All @@ -84,7 +84,7 @@ def authenticate(self, personal_number, **kwargs):
try:
out = self.client.service.Authenticate(
personalNumber=personal_number, **kwargs)
except WebFault as e:
except Error as e:
raise get_error_class(e, "Could not complete Authenticate order.")

return self._dictify(out)
Expand All @@ -110,7 +110,7 @@ def sign(self, user_visible_data, personal_number=None, **kwargs):
out = self.client.service.Sign(
userVisibleData=six.text_type(base64.b64encode(six.b(user_visible_data)), encoding='utf-8'),
personalNumber=personal_number, **kwargs)
except WebFault as e:
except Error as e:
raise get_error_class(e, "Could not complete Sign order.")

return self._dictify(out)
Expand All @@ -128,8 +128,8 @@ def collect(self, order_ref):
"""
try:
out = self.client.service.Collect(orderRef=order_ref)
except WebFault as e:
out = self.client.service.Collect(order_ref)
except Error as e:
raise get_error_class(e, "Could not complete Collect call.")

return self._dictify(out)
Expand All @@ -149,71 +149,13 @@ def file_sign(self, **kwargs):
"FileSign is deprecated and therefore not implemented.")

def _dictify(self, doc):
"""Transforms the replies from :py:mod:`suds` own types to a
regular Python dict with strings and datetimes.
"""Transforms the replies to a regular Python dict with strings and datetimes.
Tested with BankID version 2.5 return data.
:param doc: The response as interpreted by :py:mod:`suds`.
:param doc: The response as interpreted by :py:mod:`zeep`.
:returns: The response parsed to a dict.
:rtype: dict
"""
doc = dict(doc)
out = {}
try:
for k in doc:
k = _to_unicode(k)
if isinstance(doc[k], Text):
out[k] = _to_unicode(doc[k])
elif isinstance(doc[k], datetime.datetime):
out[k] = doc[k]
else:
out[k] = self._dictify(doc[k])
except:
out = doc

return out


def _to_unicode(s):
if isinstance(s, Text):
return six.text_type(s.unescape())
elif isinstance(s, six.text_type):
return s
elif isinstance(s, six.binary_type):
return s.decode('utf-8')


class RequestsTransport(HttpAuthenticated):
"""A Requests-based transport for suds, enabling the use of https and
certificates when communicating with the SOAP service.
Code has been adapted from this `Stack Overflow post
<http://stackoverflow.com/questions/6277027/suds-over-https-with-cert>`_.
"""
def __init__(self, **kwargs):
self.requests_session = requests.Session()
self.requests_session.cert = kwargs.pop('cert', None)
self.requests_session.verify = kwargs.pop('verify_cert', None)
# `super` won't work because HttpAuthenticated does not use new style class.
HttpAuthenticated.__init__(self, **kwargs)

def open(self, request):
"""Fetches the WSDL specification using certificates."""
self.addcredentials(request)
resp = self.requests_session.get(request.url,
data=request.message,
headers=request.headers)
result = six.BytesIO(resp.content)
return result

def send(self, request):
"""Posts to SOAP service using certificates."""
self.addcredentials(request)
resp = self.requests_session.post(request.url,
data=request.message,
headers=request.headers)
result = Reply(resp.status_code, resp.headers, resp.content)
return result
return {k: (self._dictify(doc[k]) if hasattr(doc[k], '_xsd_type') else doc[k]) for k in doc}
2 changes: 1 addition & 1 deletion bankid/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


def get_error_class(exc, exception_text):
error_class = _ERROR_CODE_TO_CLASS.get(six.text_type(exc.fault.faultstring))
error_class = _ERROR_CODE_TO_CLASS.get(six.text_type(exc.message))
if error_class is None:
return BankIDError("{0}: {1}".format(exc, exception_text))
else:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
requests>=2.13.0
suds-jurko>=0.6
zeep>=1.3.0
six>=1.10.0
2 changes: 2 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,5 @@ def test_certutils_main():
os.remove(os.path.expanduser('~/key.pem'))
except:
pass


0 comments on commit ac14853

Please sign in to comment.