Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/xmlsec/pk11.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def parse_uri(pk11_uri):

logging.debug("parsed pkcs11 uri: %s" % repr(o))

slot = 0
slot = None
library = None
keyname = None
query = {}
Expand Down Expand Up @@ -134,8 +134,19 @@ def _find_key(session, keyname):
_session_lock = threading.RLock()


def _session(library, slot, pin=None):
def _session(library, slot=None, pin=None, pk11_uri=None):
_session_lock.acquire()

# XXX: adhoc fix -- should test cases where slot, pin and pk11_uri
# contradict or both are 'None'
if slot is None and pk11_uri is not None:
library, slot, keyname, query = parse_uri(pk11_uri)
pin_spec = query.get('pin', "env:PYKCS11PIN")
if pin_spec.startswith("env:"):
pin = os.environ.get(pin_spec[4:], None)
else:
pin = pin_spec

if not library in _modules:
logging.debug("loading library %s" % library)
lib = PyKCS11.PyKCS11Lib()
Expand All @@ -148,6 +159,13 @@ def _session(library, slot, pin=None):
logging.debug("already loaded: %s: %s" % (library, _modules[library]))

lib = _modules[library]

# XXX: adhoc fix: if no slot given, use the first in the list
# (not the one named '0')
# Should be replaced by some proper pkcs11 uri search
if slot is None:
slot = lib.getSlotList(tokenPresent=True)[0]

session = lib.openSession(slot)
if pin is not None:
assert type(pin) == str # session.login does not like unicode
Expand Down
1 change: 1 addition & 0 deletions src/xmlsec/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def run_cmd(args,softhsm_conf=None):
env = {}
if softhsm_conf is not None:
env['SOFTHSM_CONF'] = softhsm_conf
env['SOFTHSM2_CONF'] = softhsm_conf
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
out, err = proc.communicate()
if err is not None and len(err) > 0:
Expand Down
67 changes: 47 additions & 20 deletions src/xmlsec/test/p11_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import traceback
import subprocess
import shutil
import tempfile
from defusedxml import lxml
from lxml import etree
Expand All @@ -23,12 +24,12 @@
raise unittest.SkipTest("PyKCS11 not installed")
from xmlsec.test.case import load_test_data

P11_MODULE = find_alts(['/usr/lib/libsofthsm.so', '/usr/lib/softhsm/libsofthsm.so'])
P11_ENGINE = find_alts(['/usr/lib/engines/engine_pkcs11.so'])
P11_MODULE = find_alts(['/usr/lib/libsofthsm.so', '/usr/lib/softhsm/libsofthsm.so', '/usr/lib/softhsm/libsofthsm2.so'])
P11_ENGINE = find_alts(['/usr/lib/engines/engine_pkcs11.so','/usr/lib/x86_64-linux-gnu/engines-1.1/pkcs11.so'])
P11_SPY = find_alts(['/usr/lib/pkcs11/pkcs11-spy.so'])
PKCS11_TOOL = find_alts(['/usr/bin/pkcs11-tool'])
OPENSC_TOOL = find_alts(['/usr/bin/opensc-tool'])
SOFTHSM = find_alts(['/usr/bin/softhsm'])
SOFTHSM = find_alts(['/usr/bin/softhsm','/usr/bin/softhsm2-util'])
OPENSSL = find_alts(['/usr/bin/openssl'])

try:
Expand All @@ -51,6 +52,11 @@
if P11_ENGINE is None:
raise unittest.SkipTest("libengine-pkcs11-openssl is not installed")

softhsm_version = 1
if SOFTHSM=='/usr/bin/softhsm2-util':
softhsm_version = 2


p11_test_files = []
softhsm_conf = None
server_cert_pem = None
Expand All @@ -63,6 +69,11 @@ def _tf():
p11_test_files.append(f.name)
return f.name

def _td():
d = tempfile.mkdtemp()
p11_test_files.append(d)
return d


@unittest.skipIf(P11_MODULE is None, "SoftHSM PKCS11 module not installed")
def setup():
Expand All @@ -74,11 +85,17 @@ def setup():

try:
global softhsm_conf
softhsm_db = _tf()
softhsm_conf = _tf()
logging.debug("Generating softhsm.conf")
with open(softhsm_conf, "w") as f:
f.write("#Generated by pyXMLSecurity test\n0:%s\n" % softhsm_db)
if softhsm_version == 2:
softhsm_db = _td()
f.write("#Generated by pyXMLSecurity test\ndirectories.tokendir = %s\nobjectstore.backend = file\nlog.level = DEBUG\n"%softhsm_db)

else:
softhsm_db = _tf()
f.write("#Generated by pyXMLSecurity test\n0:%s\n" % softhsm_db)

logging.debug("Initializing the token")
run_cmd([SOFTHSM,
'--slot', '0',
Expand All @@ -92,7 +109,7 @@ def setup():
'-l',
'-k',
'--key-type', 'rsa:1024',
'--slot', '0',
'--slot-index', '0',
'--id', 'a1b2',
'--label', 'test',
'--pin', 'secret1'],softhsm_conf=softhsm_conf)
Expand All @@ -117,8 +134,8 @@ def setup():

[pkcs11_section]
engine_id = pkcs11
dynamic_path = %s
MODULE_PATH = %s
# dynamic_path = %s
# MODULE_PATH = %s
PIN = secret1
init = 0

Expand All @@ -134,11 +151,11 @@ def setup():
run_cmd([OPENSSL, 'req',
'-new',
'-x509',
'-subj', "/cn=Test Signer",
'-subj', "/CN=Test Signer",
'-engine', 'pkcs11',
'-config', openssl_conf,
'-keyform', 'engine',
'-key', 'a1b2',
'-key', 'pkcs11:token=test',
'-passin', 'pass:secret1',
'-out', signer_cert_pem],softhsm_conf=softhsm_conf)

Expand All @@ -153,7 +170,7 @@ def setup():
run_cmd([PKCS11_TOOL,
'--module', P11_MODULE,
'-l',
'--slot', '0',
'--slot-index', '0',
'--id', 'a1b2',
'--label', 'test',
'-y', 'cert',
Expand All @@ -170,7 +187,10 @@ def setup():
def teardown(self):
for o in self.p11_test_files:
if os.path.exists(o):
os.unlink(o)
if os.path.isdir(o):
shutil.rmtree(o)
else:
os.unlink(o)
self.p11_test_files = []


Expand All @@ -197,7 +217,8 @@ def test_open_session(self):
session = None
try:
os.environ['SOFTHSM_CONF'] = softhsm_conf
session = pk11._session(P11_MODULE, 0, "secret1")
os.environ['SOFTHSM2_CONF'] = softhsm_conf
session = pk11._session(P11_MODULE, pk11_uri="pkcs11://%s/test?pin=secret1" % P11_MODULE)
assert session is not None
except Exception, ex:
traceback.print_exc()
Expand All @@ -211,7 +232,8 @@ def test_open_session_no_pin(self):
session = None
try:
os.environ['SOFTHSM_CONF'] = softhsm_conf
session = pk11._session(P11_MODULE, 0)
os.environ['SOFTHSM2_CONF'] = softhsm_conf
session = pk11._session(P11_MODULE, pk11_uri="pkcs11://%s/test" % P11_MODULE)
assert session is not None
except Exception, ex:
traceback.print_exc()
Expand All @@ -226,8 +248,9 @@ def test_two_sessions(self):
session2 = None
try:
os.environ['SOFTHSM_CONF'] = softhsm_conf
session1 = pk11._session(P11_MODULE, 0, "secret1")
session2 = pk11._session(P11_MODULE, 0, "secret1")
os.environ['SOFTHSM2_CONF'] = softhsm_conf
session1 = pk11._session(P11_MODULE, pk11_uri="pkcs11://%s/test?pin=secret1" % P11_MODULE)
session2 = pk11._session(P11_MODULE, pk11_uri="pkcs11://%s/test?pin=secret1" % P11_MODULE)
assert session1 != session2
assert session1 is not None
assert session2 is not None
Expand All @@ -242,8 +265,9 @@ def test_two_sessions(self):
@unittest.skipIf(P11_MODULE is None, "SoftHSM PKCS11 module not installed")
def test_bad_login(self):
os.environ['SOFTHSM_CONF'] = softhsm_conf
os.environ['SOFTHSM2_CONF'] = softhsm_conf
try:
session = pk11._session(P11_MODULE, 0, "wrong")
session = pk11._session(P11_MODULE, pk11_uri="pkcs11://%s/test?pin=wrong" % P11_MODULE)
assert False, "We should have failed the last login"
except PyKCS11Error, ex:
assert ex.value == CKR_PIN_INCORRECT
Expand All @@ -254,7 +278,8 @@ def test_find_key(self):
session = None
try:
os.environ['SOFTHSM_CONF'] = softhsm_conf
session = pk11._session(P11_MODULE, 0, "secret1")
os.environ['SOFTHSM2_CONF'] = softhsm_conf
session = pk11._session(P11_MODULE, pk11_uri="pkcs11://%s/test?pin=secret1" % P11_MODULE)
key, cert = pk11._find_key(session, "test")
assert key is not None
assert cert is not None
Expand All @@ -274,9 +299,10 @@ def test_SAML_sign_with_pkcs11(self):
print("XML input :\n{}\n\n".format(case.as_buf('in.xml')))

os.environ['SOFTHSM_CONF'] = softhsm_conf
os.environ['SOFTHSM2_CONF'] = softhsm_conf

signed = xmlsec.sign(case.as_etree('in.xml'),
key_spec="pkcs11://%s:0/test?pin=secret1" % P11_MODULE)
key_spec="pkcs11://%s/test?pin=secret1" % P11_MODULE)

# verify signature using the public key
res = xmlsec.verify(signed, signer_cert_pem)
Expand All @@ -290,9 +316,10 @@ def test_SAML_sign_with_pkcs11_cert(self):
print("XML input :\n{}\n\n".format(case.as_buf('in2.xml')))

os.environ['SOFTHSM_CONF'] = softhsm_conf
os.environ['SOFTHSM2_CONF'] = softhsm_conf

signed = xmlsec.sign(case.as_etree('in2.xml'),
key_spec="pkcs11://%s:0/test?pin=secret1" % P11_MODULE)
key_spec="pkcs11://%s/test?pin=secret1" % P11_MODULE)

print("XML output :\n{}\n\n".format(etree.tostring(signed)))
# verify signature using the public key
Expand Down