In [1]:
import socket
import ssl
import os
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes


In [2]:

def generate_rsa_keypair():
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    return private_key

def generate_dh_keypair():
    parameters = dh.generate_parameters(generator=2, key_size=2048)
    private_key = parameters.generate_private_key()
    public_key = private_key.public_key()
    return private_key, public_key

def derive_dh_shared_key(private_key, public_key):
    shared_key = private_key.exchange(public_key)
    return shared_key

def rsa_encrypt(public_key, plaintext):
    ciphertext = public_key.encrypt(
        plaintext.encode(),
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )
    return ciphertext

def rsa_decrypt(private_key, ciphertext):
    plaintext = private_key.decrypt(
        ciphertext,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )
    return plaintext.decode()

def aes_encrypt(key, plaintext):
    iv = os.urandom(16)
    cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(plaintext.encode()) + encryptor.finalize()
    return iv + ciphertext

def aes_decrypt(key, ciphertext):
    iv = ciphertext[:16]
    ciphertext = ciphertext[16:]
    cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
    decryptor = cipher.decryptor()
    plaintext = decryptor.update(ciphertext) + decryptor.finalize()
    return plaintext.decode()

def start_server():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('localhost', 8443))
    server_socket.listen(1)

    print("Server listening on port 8443...")

    while True:
        client_socket, client_address = server_socket.accept()
        print(f"Connection from {client_address} has been established!")

        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        print("SSL/TLS context created.")
        context.load_cert_chain(certfile="server.crt", keyfile="server.key")
        print("SSL/TLS context loaded.")
        
        with context.wrap_socket(client_socket, server_side=True) as ssock:
            print("SSL/TLS connection established.")

            data = ssock.recv(1024)
            print(f"Received from client: {data.decode()}")

            ssock.send("Please choose encryption method: 1 for RSA or 2 for AES (Diffie-Hellman)".encode())

            encryption_choice = ssock.recv(1024).decode()

            if encryption_choice == "1":
                # RSA encryption and PKI key exchange
                private_key = generate_rsa_keypair()
                public_key = private_key.public_key()
                client_public_key = load_pem_public_key(data, backend=default_backend())
                shared_secret = private_key.exchange(client_public_key)

                message = "This is a secure RSA message."
                ciphertext = rsa_encrypt(client_public_key, message)
                ssock.send(ciphertext)

            elif encryption_choice == "2":
                # AES encryption and Diffie-Hellman key exchange
                alice_private_key, alice_public_key = generate_dh_keypair()
                ssock.send(alice_public_key.public_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PublicFormat.SubjectPublicKeyInfo
                ))

                received_public_key = ssock.recv(2048)
                received_public_key = serialization.load_pem_public_key(received_public_key, backend=default_backend())
                shared_key = derive_dh_shared_key(alice_private_key, received_public_key)

                message = "This is a secure AES message."
                ciphertext = aes_encrypt(shared_key, message)
                ssock.send(ciphertext)



In [3]:
if __name__ == "__main__":
    start_server()


Server listening on port 8443...
Connection from ('127.0.0.1', 61840) has been established!
SSL/TLS context created.
SSL/TLS context loaded.
SSL/TLS connection established.
Received from client: Please choose encryption method: 1 for RSA or 2 for AES (Diffie-Hellman)
