-
Notifications
You must be signed in to change notification settings - Fork 21
/
pk11.py
199 lines (151 loc) · 5.96 KB
/
pk11.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import base64
import threading
__author__ = 'leifj'
from xmlsec.exceptions import XMLSigException
from six.moves.urllib_parse import urlparse
import os
import logging
from xmlsec.utils import b642pem
_modules = {}
try:
import PyKCS11
from PyKCS11.LowLevel import CKA_ID, CKA_LABEL, CKA_CLASS, CKO_PRIVATE_KEY, CKO_CERTIFICATE, CKK_RSA, CKA_KEY_TYPE, CKA_VALUE
except ImportError:
raise XMLSigException("pykcs11 is required for PKCS#11 keys - cf README.rst")
all_attributes = list(PyKCS11.CKA.keys())
# remove the CKR_ATTRIBUTE_SENSITIVE attributes since we can't get
all_attributes.remove(PyKCS11.LowLevel.CKA_PRIVATE_EXPONENT)
all_attributes.remove(PyKCS11.LowLevel.CKA_PRIME_1)
all_attributes.remove(PyKCS11.LowLevel.CKA_PRIME_2)
all_attributes.remove(PyKCS11.LowLevel.CKA_EXPONENT_1)
all_attributes.remove(PyKCS11.LowLevel.CKA_EXPONENT_2)
all_attributes.remove(PyKCS11.LowLevel.CKA_COEFFICIENT)
all_attributes = [e for e in all_attributes if isinstance(e, int)]
def parse_uri(pk11_uri):
o = urlparse(pk11_uri)
if o.scheme != 'pkcs11':
raise XMLSigException("Bad URI scheme in pkcs11 URI %s" % pk11_uri)
logging.debug("parsed pkcs11 uri: %s" % repr(o))
slot = None
library = None
keyname = None
query = {}
if not '/' in o.path:
raise XMLSigException("Missing keyname part in pkcs11 URI (pkcs11://[library[:slot]/]keyname[?pin=<pin>])")
(module_path, sep, keyqs) = o.path.rpartition('/')
qs = o.query
if qs:
keyname = keyqs
elif '?' in keyqs:
(keyname, sep, qss) = keyqs.rpartition('?')
qs = qss
else:
keyname = keyqs
if qs:
for av in qs.split('&'):
if not '=' in av:
raise XMLSigException("Bad query string in pkcs11 URI %s" % pk11_uri)
(a, sep, v) = av.partition('=')
assert a
assert v
query[a] = v
if ':' in module_path:
(library, sep, slot_str) = module_path.rpartition(":")
slot = int(slot_str)
else:
library = module_path
if library is None or len(library) == 0:
library = os.environ.get('PYKCS11LIB', None)
if library is None or len(library) == 0:
raise XMLSigException("No PKCS11 module in pkcs11 URI %s" % pk11_uri)
logging.debug("returning %s %s %s %s" % (library, slot, keyname, query))
return library, slot, keyname, query
def _intarray2bytes(x):
return bytearray(x)
def _close_session(session):
_session_lock.acquire()
session.logout()
session.closeSession()
_session_lock.release()
def _sign_and_close(session, key, data, mech):
logging.debug("signing %d bytes using %s" % (len(data), mech))
#import pdb; pdb.set_trace()
sig = session.sign(key, data, mech)
_close_session(session)
return _intarray2bytes(sig)
def _find_object(session, template):
for o in session.findObjects(template):
try:
logging.debug("Found pkcs11 object: %s" % o)
except PyKCS11.PyKCS11Error as exc:
# Fetching attributes might be restricted (CKR_ATTRIBUTE_SENSITIVE)
logging.debug("Found pkcs11 object, but can't print it (%s)" % exc)
pass
return o
return None
def _get_object_attributes(session, o):
attributes = session.getAttributeValue(o, all_attributes)
return dict(zip(all_attributes, attributes))
def _find_key(session, keyname):
key = _find_object(session, [(CKA_LABEL, keyname), (CKA_CLASS, CKO_PRIVATE_KEY), (CKA_KEY_TYPE, CKK_RSA)])
if key is None:
return None, None
key_a = _get_object_attributes(session, key)
cert = _find_object(session, [(CKA_ID, key_a[CKA_ID]), (CKA_CLASS, CKO_CERTIFICATE)])
cert_pem = None
if cert is not None:
cert_a = _get_object_attributes(session, cert)
cert_pem = b642pem(base64.standard_b64encode(_intarray2bytes(cert_a[CKA_VALUE])))
logging.debug(cert)
return key, cert_pem
_session_lock = threading.RLock()
def _session(library, slot=None, pin=None, pk11_uri=None):
_session_lock.acquire()
# XXX: adhoc fix -- should test cases where slot, pin and pk11_uri
# contradict or both are 'None'
if slot is None and pk11_uri is not None:
library, slot, keyname, query = parse_uri(pk11_uri)
pin_spec = query.get('pin', "env:PYKCS11PIN")
if pin_spec.startswith("env:"):
pin = os.environ.get(pin_spec[4:], None)
else:
pin = pin_spec
if not library in _modules:
logging.debug("loading library %s" % library)
lib = PyKCS11.PyKCS11Lib()
assert type(library) == str # lib.load does not like unicode
lib.load(library)
# XXX should check result of C_Initialize()
lib.lib.C_Initialize()
_modules[library] = lib
else:
logging.debug("already loaded: %s: %s" % (library, _modules[library]))
lib = _modules[library]
# XXX: adhoc fix: if no slot given, use the first in the list
# (not the one named '0')
# Should be replaced by some proper pkcs11 uri search
if slot is None:
slot = lib.getSlotList(tokenPresent=True)[0]
session = lib.openSession(slot)
if pin is not None:
assert type(pin) == str # session.login does not like unicode
session.login(pin)
else:
logging.warning("No pin provided - not logging in")
_session_lock.release()
return session
def signer(pk11_uri, mech=PyKCS11.MechanismRSAPKCS1):
library, slot, keyname, query = parse_uri(pk11_uri)
pin = None
pin_spec = query.get('pin', "env:PYKCS11PIN")
if pin_spec.startswith("env:"):
pin = os.environ.get(pin_spec[4:], None)
else:
pin = pin_spec
session = _session(str(library), slot, str(pin))
key, cert = _find_key(session, keyname)
if key is None:
raise XMLSigException("No such key: %s" % pk11_uri)
if cert is not None:
logging.info("Found matching cert in token")
return lambda data: _sign_and_close(session, key, data, mech), cert