-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_certificate.py
114 lines (98 loc) · 4.2 KB
/
test_certificate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
Test our certificates
"""
import json
import os
import unittest
import requests
from asn1crypto import pem as asn1_pem
from asn1crypto import x509 as asn1_x509
from src.pkcs11_ca_service.asn1 import create_jwt_header_str
from src.pkcs11_ca_service.config import ROOT_URL
from .lib import create_i_ca, verify_pkcs11_ca_tls_cert
SEARCH_CERTIFICATE_ENDPOINT = "/search/certificate"
class TestCertificate(unittest.TestCase):
"""
Test our certificates.
"""
if "CA_URL" in os.environ:
ca_url = os.environ["CA_URL"]
else:
ca_url = ROOT_URL
name_dict = {
"country_name": "SE",
"state_or_province_name": "Stockholm",
"locality_name": "Stockholm_test",
"organization_name": "SUNET_cert",
"organizational_unit_name": "SUNET Infrastructure",
"common_name": "ca-test-certificate-48.sunet.se",
}
def test_certificate(self) -> None:
"""
Search for certificates
"""
with open("data/trusted_keys/privkey1.key", "rb") as f_data:
priv_key = f_data.read()
with open("data/trusted_keys/pubkey1.pem", "rb") as f_data:
pub_key = f_data.read()
csr_pem = """-----BEGIN CERTIFICATE REQUEST-----
MIICsjCCAZoCAQAwbTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEmMCQGA1UEAwwddGVz
dC1jZXJ0LTM1LmNhLXRlc3Quc3VuZXQuc2UwggEiMA0GCSqGSIb3DQEBAQUAA4IB
DwAwggEKAoIBAQDlsQXv/htfEAyLgklU/0ytP1MBaLQij+1Z+b9yzQUy0dzYfwcz
NyxjzBSRhM5Rz1GfEMmwSi/TsRD6UNgiho7nkUe0Yjo7gaEXKdfl1J98KcAQrm0u
oLLHezxbPH6cLDnoWDDBnkfMldMNAftshef38Ct0V3RFyzkhSoONudAYfi0+M7Y4
h1CPMvtne9QbB01e8sM8yP4x6VwL0NvLDl9HqFX0ALAsTbd8e+udx+K9fDUP/f7/
MXZrbfhZuH31eHpDcr9RQP1htp8IoBBC+45q7pX2VHp5mecQxLHghOHjDx1LERLE
Y3mykDdLsvC3d1A5uG4Gqjd3mI2RqMLZy1w1AgMBAAGgADANBgkqhkiG9w0BAQsF
AAOCAQEAHaS3QCN6XV6CPnXenJTmOG28FihZ3esbmwbqZk9AiSjB2rCuIwoiVWaz
ahKaw7Hmv+Mwo5k+hVnlo8zRwz8v+2caWJ3XKuljORDF3AP66i6XolYVxGASaS6W
cq7k24ygqQTJGcSGWKhStq3RniMste/waNFtfGJ6GForGJBWcCO1AvlesHnycnxw
lxodC+GWZuKdkBRWk4lWSFJpVMjCTnHL/qSOEoEdZ0Gam9yI8bRTtDprwIsXw8gK
IHuEGEoo1BdVvQEq/Jd6jpjjix68mxHQXc3tQBRRMoZVtf8izoNJRMJrqokT4x54
4afzNzZEQ9AI0J9WsJgFo26jNyOHUQ==
-----END CERTIFICATE REQUEST-----"""
data = json.loads('{"pem": "' + csr_pem.replace("\n", "\\n") + '"' + "}")
data["ca_pem"] = create_i_ca(self.ca_url, pub_key, priv_key, self.name_dict)
request_headers = {"Authorization": create_jwt_header_str(pub_key, priv_key, self.ca_url + "/sign_csr")}
req = requests.post(
self.ca_url + "/sign_csr",
headers=request_headers,
json=data,
timeout=10,
verify=verify_pkcs11_ca_tls_cert(),
)
self.assertTrue(req.status_code == 200)
# Get ALL certificates, currently the database limits this to the last 20 issued by the PKCS11 CA
request_headers = {
"Authorization": create_jwt_header_str(pub_key, priv_key, self.ca_url + SEARCH_CERTIFICATE_ENDPOINT)
}
req = requests.get(
self.ca_url + SEARCH_CERTIFICATE_ENDPOINT,
headers=request_headers,
timeout=10,
verify=verify_pkcs11_ca_tls_cert(),
)
self.assertTrue(req.status_code == 200)
certs = json.loads(req.text)["certificates"]
self.assertTrue(isinstance(certs, list))
self.assertTrue(len(certs) > 0)
# Search for certificates
request_headers = {
"Authorization": create_jwt_header_str(pub_key, priv_key, self.ca_url + SEARCH_CERTIFICATE_ENDPOINT)
}
data = json.loads('{"pem": ' + '"' + certs[0].replace("\n", "\\n") + '"' + "}")
req = requests.post(
self.ca_url + SEARCH_CERTIFICATE_ENDPOINT,
headers=request_headers,
json=data,
timeout=10,
verify=verify_pkcs11_ca_tls_cert(),
)
self.assertTrue(req.status_code == 200)
certs = json.loads(req.text)["certificates"]
self.assertTrue(len(certs) == 1)
cert_data = certs[0].encode("utf-8")
if asn1_pem.detect(cert_data):
_, _, cert_data = asn1_pem.unarmor(cert_data)
self.assertTrue(isinstance(asn1_x509.Certificate.load(cert_data), asn1_x509.Certificate))