In [1]:
!pip install cryptography -q

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding as sympadding
import os
import json
import base64
import time

# ---------------- RSA CRYPTO ---------------- #

class RSACrypto:
    @staticmethod
    def generate_keys():
        private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
        public_key = private_key.public_key()
        return private_key, public_key

    @staticmethod
    def public_key_to_bytes(key):
        return key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

    @staticmethod
    def bytes_to_public_key(data):
        return serialization.load_pem_public_key(data)

    @staticmethod
    def encrypt_with_public_key(data, public_key):
        return public_key.encrypt(
            data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )

    @staticmethod
    def decrypt_with_private_key(data, private_key):
        return private_key.decrypt(
            data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )

# ---------------- AES CRYPTO ---------------- #

class AESCrypto:

    @staticmethod
    def generate_key():
        return os.urandom(32)  # 256-bit key

    @staticmethod
    def encrypt_message(message, key):
        iv = os.urandom(16)
        cipher = Cipher(algorithms.AES(key), modes.GCM(iv))
        encryptor = cipher.encryptor()

        ciphertext = encryptor.update(message.encode()) + encryptor.finalize()
        return {
            "ciphertext": base64.b64encode(ciphertext).decode(),
            "iv": base64.b64encode(iv).decode(),
            "tag": base64.b64encode(encryptor.tag).decode(),
            "timestamp": time.strftime("%H:%M:%S")
        }

    @staticmethod
    def decrypt_message(ciphertext, tag, iv, key):
        ciphertext = base64.b64decode(ciphertext)
        iv = base64.b64decode(iv)
        tag = base64.b64decode(tag)

        cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag))
        decryptor = cipher.decryptor()

        message = decryptor.update(ciphertext) + decryptor.finalize()
        return message.decode(), time.strftime("%H:%M:%S")
!pip install cryptography -q

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding as sympadding
import os
import json
import base64
import time

# ---------------- RSA CRYPTO ---------------- #

class RSACrypto:
    @staticmethod
    def generate_keys():
        private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
        public_key = private_key.public_key()
        return private_key, public_key

    @staticmethod
    def public_key_to_bytes(key):
        return key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

    @staticmethod
    def bytes_to_public_key(data):
        return serialization.load_pem_public_key(data)

    @staticmethod
    def encrypt_with_public_key(data, public_key):
        return public_key.encrypt(
            data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )

    @staticmethod
    def decrypt_with_private_key(data, private_key):
        return private_key.decrypt(
            data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )

# ---------------- AES CRYPTO ---------------- #

class AESCrypto:

    @staticmethod
    def generate_key():
        return os.urandom(32)  # 256-bit key

    @staticmethod
    def encrypt_message(message, key):
        iv = os.urandom(16)
        cipher = Cipher(algorithms.AES(key), modes.GCM(iv))
        encryptor = cipher.encryptor()

        ciphertext = encryptor.update(message.encode()) + encryptor.finalize()
        return {
            "ciphertext": base64.b64encode(ciphertext).decode(),
            "iv": base64.b64encode(iv).decode(),
            "tag": base64.b64encode(encryptor.tag).decode(),
            "timestamp": time.strftime("%H:%M:%S")
        }

    @staticmethod
    def decrypt_message(ciphertext, tag, iv, key):
        ciphertext = base64.b64decode(ciphertext)
        iv = base64.b64decode(iv)
        tag = base64.b64decode(tag)

        cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag))
        decryptor = cipher.decryptor()

        message = decryptor.update(ciphertext) + decryptor.finalize()
        return message.decode(), time.strftime("%H:%M:%S")

In [2]:
import socket
import threading
import json

clients = []  # List of (handler, aes_key, username)

class ClientHandler(threading.Thread):
    def __init__(self, sock, addr):
        super().__init__()
        self.sock = sock
        self.addr = addr
        self.aes_key = None
        self.username = None

    def handshake(self):
        print(f"[+] Client connected {self.addr}")

        # 1) Receive PUBLIC KEY
        pub_bytes = self.sock.recv(4096)
        client_pub = RSACrypto.bytes_to_public_key(pub_bytes)

        # 2) Generate AES key
        self.aes_key = AESCrypto.generate_key()

        # 3) Encrypt AES with client's public key
        encrypted_key = RSACrypto.encrypt_with_public_key(self.aes_key, client_pub)

        # 4) Send AES session key
        self.sock.send(encrypted_key)

        # 5) Receive encrypted username
        raw = self.sock.recv(4096)
        obj = json.loads(raw.decode())
        username, ts = AESCrypto.decrypt_message(
            obj["ciphertext"], obj["tag"], obj["iv"], self.aes_key
        )
        self.username = username

        print(f"[+] {self.username} joined the chat.")
        return True

    def broadcast(self, sender, text):
        for handler, key, user in clients:
            if handler is not self:
                encrypted = AESCrypto.encrypt_message(f"{sender}: {text}", key)
                handler.sock.send(json.dumps(encrypted).encode())

    def run(self):
        if not self.handshake():
            return

        clients.append((self, self.aes_key, self.username))

        while True:
            try:
                raw = self.sock.recv(4096)
                if not raw:
                    break

                obj = json.loads(raw.decode())
                text, ts = AESCrypto.decrypt_message(
                    obj["ciphertext"], obj["tag"], obj["iv"], self.aes_key
                )

                print(f"[{self.username}] {text}")
                self.broadcast(self.username, text)

            except:
                break

        print(f"[-] {self.username} disconnected")
        self.sock.close()
        clients.remove((self, self.aes_key, self.username))


def start_server():
    server = socket.socket()
    server.bind(("0.0.0.0", 9999))
    server.listen(5)
    print("üöÄ Secure Chat Server started on port 9999")

    while True:
        sock, addr = server.accept()
        ClientHandler(sock, addr).start()

# Run in background thread
threading.Thread(target=start_server, daemon=True).start()

Exception in thread Thread-9 (start_server):
Traceback (most recent call last):
  File "D:\PY\Lib\threading.py", line 1075, in _bootstrap_inner


In [3]:
import socket
import threading
import json

def start_client(username):
    sock = socket.socket()
    sock.connect(("127.0.0.1", 9999))

    # RSA keys
    private, public = RSACrypto.generate_keys()

    # Send public key
    sock.send(RSACrypto.public_key_to_bytes(public))

    # Receive AES session key
    encrypted_key = sock.recv(4096)
    aes_key = RSACrypto.decrypt_with_private_key(encrypted_key, private)

    print(f"üîê Secure channel created for {username}")

    # --- Send username encrypted ---
    encrypted = AESCrypto.encrypt_message(username, aes_key)
    sock.send(json.dumps(encrypted).encode())

    # Thread to listen for messages
    def listen():
        while True:
            try:
                data = sock.recv(4096)
                if not data:
                    break
                msg = json.loads(data.decode())
                
                # Decrypt the message
                text, ts = AESCrypto.decrypt_message(
                    msg["ciphertext"], msg["tag"], msg["iv"], aes_key
                )
                
                # Print encrypted + decrypted for presentation
                print(f"\nüì® Received Encrypted: {msg['ciphertext']}")
                print(f"üîì Decrypted: {text}\n")
                
            except:
                break

    threading.Thread(target=listen, daemon=True).start()

    # Sending loop
    while True:
        text = input("")  # message you type
        encrypted = AESCrypto.encrypt_message(text, aes_key)

        # üîπ PRINT ENCRYPTED MESSAGE (for presentation)
        print(f"üîí Sent Encrypted: {encrypted['ciphertext']}")
        print(f"üîë IV: {encrypted['iv']}")
        print(f"üîë TAG: {encrypted['tag']}")

        # Send to server
        sock.send(json.dumps(encrypted).encode())

        # üîπ PRINT DECRYPTED (just to show it's correct)
        decrypted_text, ts = AESCrypto.decrypt_message(
            encrypted["ciphertext"], encrypted["tag"], encrypted["iv"], aes_key
        )
        print(f"üîì Decrypted Sent: {decrypted_text}\n")

    self.run()
  File "D:\PY\Lib\threading.py", line 1012, in run


In [None]:
start_client("Alice")

    self._target(*self._args, **self._kwargs)
  File "C:\Users\User\AppData\Local\Temp\ipykernel_24524\213163058.py", line 78, in start_server
OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted


üîê Secure channel created for Alice


 hello man


üîí Sent Encrypted: HcS1bMU/zM0f
üîë IV: OXkDHVIwb96yWkO1yeMkeQ==
üîë TAG: /OnOn3Ytf0c6uUq4iIq7lw==
üîì Decrypted Sent: hello man

