In [12]:
import requests
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from CertificationAuthority import CertificationAuthority
import datetime
import util
import os
from cryptography import exceptions, x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes

In [13]:
#ca_root use port 8000
#ca1 use port 8001
#ca2_use port 8002
#ca1_1 use port 8003
#ca2_1 use port 8004

#ca_root - ca1 (intermediate) - ca1_1 (intermediate) - client 1
#ca_root - ca2 (intermediate) - ca2_2 (intermediate) - client 2

In [14]:
# client 1 cert sign by ca1_1
# 1. generate RSA key for client 1
rsa_key = util.generate_ras_key()

# 2. create certificate sign request and sign with client private key
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
    # Provide various details about who we are.
    x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
    x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"California"),
    x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
    x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
    x509.NameAttribute(NameOID.COMMON_NAME, u"client1.com"),
])).add_extension(
    x509.SubjectAlternativeName([
        # Describe what sites we want this certificate for.
        x509.DNSName(u"mysite.com"),
        x509.DNSName(u"www.mysite.com"),
        x509.DNSName(u"subdomain.mysite.com"),
    ]),
    critical=False,
# Sign the CSR with our private key. is applicant's private key
).sign(rsa_key, hashes.SHA256())

# 3. send to certificate authority,  the data param name corresond fastapi parameter
response = requests.post(" http://127.0.0.1:8003/issue_cert",files={'csr_file':csr.public_bytes(serialization.Encoding.PEM)})
client1_cert = x509.load_pem_x509_certificate(response.content)
print("applicant: "+client1_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value)
print("issuer: "+client1_cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value)
print(client1_cert.not_valid_before)
print(client1_cert.not_valid_after)

applicant: client1.com
issuer: ca1_1
2021-11-26 01:29:05
2021-12-06 01:29:05


In [15]:
# client 2 cert sign by ca2_2
# 1. generate RSA key for client 2
rsa_key = util.generate_ras_key()

# 2. create certificate sign request and sign with client private key
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
    # Provide various details about who we are.
    x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
    x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"California"),
    x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
    x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
    x509.NameAttribute(NameOID.COMMON_NAME, u"client2.com"),
])).add_extension(
    x509.SubjectAlternativeName([
        # Describe what sites we want this certificate for.
        x509.DNSName(u"mysite.com"),
        x509.DNSName(u"www.mysite.com"),
        x509.DNSName(u"subdomain.mysite.com"),
    ]),
    critical=False,
# Sign the CSR with our private key. is applicant's private key
).sign(rsa_key, hashes.SHA256())

# 3. send to certificate authority,  the data param name corresond fastapi parameter
response = requests.post(" http://127.0.0.1:8004/issue_cert",files={'csr_file':csr.public_bytes(serialization.Encoding.PEM)})
client2_cert = x509.load_pem_x509_certificate(response.content)
print("applicant: "+client2_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value)
print("issuer: "+client2_cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value)
print(client2_cert.not_valid_before)
print(client2_cert.not_valid_after)

applicant: client2.com
issuer: ca2_1
2021-11-26 01:29:05
2021-12-06 01:29:05


In [16]:
#create function to verify cert (root/ca1/ca2/ca1_1/ca2_2/client1/client2)
def verifycert(client):
    port = ["8000", "8001", "8003", "8002", "8004"]
    cport = ""
    #get cert
    if client == ("client1"):
        clientcert = client1_cert
    elif client == ("client2"):
        clientcert = client2_cert
    elif client == ("root"):
        cport = port[0]
    elif client == ("ca1"):
        cport = port[1]
    elif client == ("ca2"):
        cport = port[3]
    elif client == ("ca1_1"):
        cport = port[2]
    elif client == ("ca2_2"):
        cport = port[4]
    else:
        print("invalid client")
        return
    
    if client != "client1" and client != "client2":
        response = requests.get(" http://127.0.0.1:" + cport + "/get_CA_cert")
        clientcert = x509.load_pem_x509_certificate(response.content)
    
    for i in range(5):
        #get public key
        response = requests.get(" http://127.0.0.1:" + port[i] +"/get_CA_public_key")
        public_key=serialization.load_pem_public_key(response.content)
        
        #verify cert
        if port[i] == "8000":
            print(client + " cert verify with " + "root public key")
        
        elif port[i] == "8001":
            print(client + " cert verify with " + "ca1 public key")
            
        elif port[i] == "8002":
            print(client + " cert verify with " + "ca2 public key")
        
        elif port[i] == "8003":
            print(client + " cert verify with " + "ca1_1 public key")
            
        else:
            print(client + " cert verify with " + "ca2_2 public key")
            
        print("validate: "+ str(util.verify_cert_signature(clientcert,public_key)))

print("===validate client1 cert===")
verifycert("client1")
print()

print("===validate client2 cert===")
verifycert("client2")
print()

print("===validate root cert===")
verifycert("root")
print()

print("===validate ca1 cert===")
verifycert("ca1")
print()

print("===validate ca2 cert===")
verifycert("ca2")
print()

print("===validate ca1_1 cert===")
verifycert("ca1_1")
print()

print("===validate ca2_2 cert===")
verifycert("ca2_2")

===validate client1 cert===
client1 cert verify with root public key
validate: False
client1 cert verify with ca1 public key
validate: False
client1 cert verify with ca1_1 public key
validate: True
client1 cert verify with ca2 public key
validate: False
client1 cert verify with ca2_2 public key
validate: False

===validate client2 cert===
client2 cert verify with root public key
validate: False
client2 cert verify with ca1 public key
validate: False
client2 cert verify with ca1_1 public key
validate: False
client2 cert verify with ca2 public key
validate: False
client2 cert verify with ca2_2 public key
validate: True

===validate root cert===
root cert verify with root public key
validate: True
root cert verify with ca1 public key
validate: False
root cert verify with ca1_1 public key
validate: False
root cert verify with ca2 public key
validate: False
root cert verify with ca2_2 public key
validate: False

===validate ca1 cert===
ca1 cert verify with root public key
validate: True
ca1

In [18]:
#get cert
response = requests.get(" http://127.0.0.1:" + "8000" + "/get_CA_cert")
ca_root_cert = x509.load_pem_x509_certificate(response.content)
response = requests.get(" http://127.0.0.1:" + "8001" + "/get_CA_cert")
ca1_cert = x509.load_pem_x509_certificate(response.content)
response = requests.get(" http://127.0.0.1:" + "8002" + "/get_CA_cert")
ca2_cert = x509.load_pem_x509_certificate(response.content)
response = requests.get(" http://127.0.0.1:" + "8003" + "/get_CA_cert")
ca1_1_cert = x509.load_pem_x509_certificate(response.content)
response = requests.get(" http://127.0.0.1:" + "8004" + "/get_CA_cert")
ca2_1_cert = x509.load_pem_x509_certificate(response.content)

print('ca1 revoke ca1_1 cert')
#check revoke cert status of ca1_1
response = requests.post(" http://127.0.0.1:8001/revoke_cert_status",files={'crt_file':ca1_1_cert.public_bytes(serialization.Encoding.PEM)})
print(response.content)
#ca1 revoke ca1_1 cert
response = requests.post(" http://127.0.0.1:8001/revoke_cert",files={'crt_file':ca1_1_cert.public_bytes(serialization.Encoding.PEM)})
print(response.status_code)
print(response.content)
#check revoke cert status of ca1_1
response = requests.post(" http://127.0.0.1:8001/revoke_cert_status",files={'crt_file':ca1_1_cert.public_bytes(serialization.Encoding.PEM)})
print(response.content)

print('root revoke ca2 cert')
#check revoke cert status of ca2
response = requests.post(" http://127.0.0.1:8000/revoke_cert_status",files={'crt_file':ca2_cert.public_bytes(serialization.Encoding.PEM)})
print(response.content)
#root revoke ca2 cert
response = requests.post(" http://127.0.0.1:8000/revoke_cert",files={'crt_file':ca2_cert.public_bytes(serialization.Encoding.PEM)})
print(response.status_code)
print(response.content)
#check revoke cert status of ca2
response = requests.post(" http://127.0.0.1:8000/revoke_cert_status",files={'crt_file':ca2_cert.public_bytes(serialization.Encoding.PEM)})
print(response.content)

ca1 revoke ca1_1 cert
b'{"detail":"Method Not Allowed"}'
200
b'{"msg":"Revoke successful"}'
b'{"detail":"Method Not Allowed"}'
root revoke ca2 cert
b'{"detail":"Method Not Allowed"}'
200
b'{"msg":"Revoke successful"}'
b'{"detail":"Method Not Allowed"}'
