In [1]:
import datetime

from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.hazmat.primitives.serialization import load_pem_private_key

In [2]:
def generate_rsa_private_key(public_exponent=65537, key_size=2048):
    return rsa.generate_private_key(public_exponent=public_exponent, key_size=key_size, backend=default_backend())

In [3]:

def get_user_attributes_dummy(user_id):
    """
    Returns database attributes for a user id

    Parameters
    ----------
    user_id: str
        User identifier

    Returns
    -------
    Tuple(user_first_name, user_last_name, user_email)
        user_first_name: str
            User first name
        user_last_name: str
            User last name
        user_email: str
            User email

    """
    # MySQL prepared statement
    # sql_command = ('SELECT lastname, firstname, email FROM users WHERE uid = %s')
    # cursor.execute(sql_command, (uid,))
    user_id
    return "Tairieur", "Alain", "alaintairieur@gmail.com"


In [4]:
def write_certificate(certificate, certificate_file_name):
    with open(certificate_file_name, 'w') as file:
        file.write(certificate.public_bytes(encoding=serialization.Encoding.PEM).decode())

In [5]:
def read_certificate(certificate_file_name):
    pem_cert = open(certificate_file_name,"rb").read()
    return x509.load_pem_x509_certificate(pem_cert, default_backend())

In [6]:
def save_key(key, filename):
    pem = key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption()
    )
    with open(filename, 'wb') as pem_out:
        pem_out.write(pem)

In [7]:
def load_key(filename):
    with open(filename, 'rb') as pem_in:
        pemlines = pem_in.read()
    private_key = load_pem_private_key(pemlines, None, default_backend())
    return private_key

In [8]:
def create_root_certificate(root_certificate_path=None, root_private_key_path=None):
    
    root_key = generate_rsa_private_key()
    
    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"CH"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Zurich"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"Zurich"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"iMoviesCA"),
    ])

    root_certificate = x509.CertificateBuilder().subject_name(
        subject
    ).issuer_name(
        issuer
    ).public_key(
        root_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=365)
    ).sign(root_key, hashes.SHA256(), default_backend())
    
    if root_certificate_path:
        write_certificate(root_certificate, root_certificate_path)
        
    if root_private_key_path:
        save_key(root_key, root_private_key_path)
        
    return root_certificate, root_key

In [9]:
def certificate_issuing(user_id, user_pwd_hash, root_certificate_path= 'root_certificate.pem', root_private_key_path="root_private_key.pem", validity = 30):
    """
    Issues certificate for users if correctly authenticated

    Parameters
    ----------

    user_id: str
        User identifier

    user_pwd_hash: str
        Hash of the user password

    Returns
    -------
    New certificate

    """

    #Connect to the database and check user_id, user_pwd_hash

    root_certificate = read_certificate(root_certificate_path)
    
    user_last_name, user_first_name, user_email = get_user_attributes_dummy(user_id)
    
    certificate_key = generate_rsa_private_key()
    
    root_key = load_key(root_private_key_path)
    
    certificate = x509.CertificateBuilder().subject_name(x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"CH"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Zurich"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"Zurich"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"iMovies"),
        x509.NameAttribute(NameOID.USER_ID, u"{}".format(user_id)),
        x509.NameAttribute(NameOID.SURNAME, u"{}".format(user_last_name)),
        x509.NameAttribute(NameOID.GIVEN_NAME, u"{}".format(user_first_name)),
        x509.NameAttribute(NameOID.EMAIL_ADDRESS, u"{}".format(user_email)),
    ])).issuer_name(
        root_certificate.issuer
    ).public_key(
        certificate_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
    datetime.datetime.utcnow() + datetime.timedelta(days=validity)
    ).sign(root_key, hashes.SHA256(), default_backend())
    
    return certificate, certificate_key

In [10]:
def verify_certificate(certificate, root_certificate):
    
    
    try:
        #verify raises ~cryptography.exceptions.InvalidSignature exception if the signature fails to verify.
        root_certificate.public_key().verify(
            certificate.signature,
            certificate.tbs_certificate_bytes,
            # Depends on the algorithm used to create the certificate
            padding.PKCS1v15(),
            certificate.signature_hash_algorithm,
        )
        return True
    except:
        return False

In [11]:
def revoke_certificate(certificate):
    builder = x509.RevokedCertificateBuilder()
    builder = builder.revocation_date(datetime.today())
    builder = builder.serial_number(certificate.serial_number)
    return builder.build(default_backend())

In [12]:
class CRL:
    def __init__(self):
        self.revoked_certificates = []
        
    def revoke_certificate(self, certificate):
        builder = x509.RevokedCertificateBuilder()
        builder = builder.revocation_date(datetime.datetime.today())
        builder = builder.serial_number(certificate.serial_number)
        self.revoked_certificates.append(builder.build(default_backend()))
        
    def get_crl(self, root_certificate_path= 'root_certificate.pem', root_private_key_path="root_private_key.pem"):
    # Load our root cert
        root_key = load_key(root_private_key_path)
        
        root_certificate = read_certificate(root_certificate_path)

        builder = x509.CertificateRevocationListBuilder()
        builder = builder.last_update(datetime.datetime.today())
        builder = builder.next_update(datetime.datetime.today() + datetime.timedelta(1, 0, 0))
        builder = builder.issuer_name(root_certificate.issuer)
        if self.revoked_certificates:
            for revoked_cert in self.revoked_certificates:
                builder = builder.add_revoked_certificate(revoked_cert)
        cert_revocation_list = builder.sign(
            private_key=root_key, algorithm=hashes.SHA256(), backend=default_backend()
        )
        return cert_revocation_list.public_bytes(encoding=serialization.Encoding.PEM)
    
    def update_crl(self, certificate):
        self.revoke_certificate(certificate)
        return self.get_crl()
    

In [13]:
def is_revoked(certificate, crl_pem):
    crl = x509.load_pem_x509_crl(crl_pem, backend=default_backend())
    return crl.get_revoked_certificate_by_serial_number(certificate.serial_number) != None

In [14]:
root_certificate, root_key = create_root_certificate('root_certificate.pem', "root_private_key.pem")

In [15]:
certificate, private_key = certificate_issuing("user_id", "user_password")

In [16]:
verify_certificate(certificate, root_certificate)

True

In [17]:
crl = CRL()
crl_pem = crl.update_crl(certificate)

In [18]:
crl = CRL()
crl_pem = crl.get_crl()

In [19]:
is_revoked(certificate, crl_pem)

False

In [20]:
certificate

<Certificate(subject=<Name(C=CH,ST=Zurich,L=Zurich,O=iMovies,UID=user_id,2.5.4.4=Tairieur,2.5.4.42=Alain,1.2.840.113549.1.9.1=alaintairieur@gmail.com)>, ...)>