# **TP1 - G11**

### Trabalho realizado por:
- PG57879 - João Andrade Rodrigues
- PG57889 - Martim José Amaro Redondo
- PG57511 - Benjamim Meleiro Rodrigues

# **Exercício 2**

Neste exercício, implementamos um canal seguro que permite a comunicação criptografada entre duas partes, Alice e Bob. O canal é baseado em algoritmos de criptografia modernos como o AES (Advanced Encryption Standard), utilizando uma chave derivada de um compartilhamento de chave via X25519 para estabelecer um segredo compartilhado. Para a autenticação e verificação de integridade, usamos uma combinação de AES com um esquema de autenticação utilizando tags de integridade.

Ao longo deste exercício, a segurança do canal é garantida pelo uso de um AEAD (Authenticated Encryption with Associated Data), que proporciona confidencialidade e autenticação para as mensagens trocadas.

As principais funcionalidades implementadas incluem:

- **Derivação de Chave:** A chave de criptografia é derivada a partir do segredo compartilhado utilizando o HKDF (HMAC-based Extract-and-Expand Key Derivation Function).
- **Cifra de Bloco Tweakable:** O AES é usado em modo ECB, mas com a modificação do bloco de plaintext através de um tweak (variação) antes da cifragem e depois da decifragem.
- **Autenticação de Mensagens:** Para garantir a integridade das mensagens e a autenticidade do remetente, calculamos e verificamos uma tag de autenticação.
- **Uso de Nonces:** Nonces são utilizados para garantir a unicidade das mensagens e impedir ataques de repetição.

In [8]:
import os
import hmac
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes, hmac as cryptography_hmac
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.asymmetric import x25519, ed25519
from cryptography.hazmat.primitives import serialization

#### **1. Derivação de Chave e Tweak**
Uma parte crucial da segurança do canal é a derivação de chaves. O segredo compartilhado gerado pelo X25519 é processado por um algoritmo de derivação de chave, HKDF, para gerar uma chave de 128 bits e um tweak de 128 bits que serão usados para a cifra.

In [9]:
def derive_key_and_tweak(shared_secret: bytes) -> tuple[bytes, bytes]:
    hkdf = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=b"AEAD Key Derivation",
        backend=default_backend(),
    )
    derived_material = hkdf.derive(shared_secret)
    return derived_material[:16], derived_material[16:]

#### **2. Geração e Construção de Nonces e Tweak**
O nonce é uma sequência única que garante que cada operação de cifragem seja diferente, mesmo que o plaintext seja o mesmo. Ele é gerado aleatoriamente e é combinado com um contador ou tamanho da mensagem para formar o tweak.

In [10]:
def generate_nonce() -> bytes:
    # Gera um nonce de 8 bytes (64 bits, metade do tamanho do bloco AES de 128 bits)
    return os.urandom(8)

def construct_tweak(nonce: bytes, counter: int, is_auth: bool, total_length: int = None) -> bytes:
    """
    Constrói o tweak seguindo o formato especificado:
    - Primeiros b/2 bits (8 bytes): nonce
    - Bits do meio: contador ou comprimento total
    - Último bit: 0 para cifra normal, 1 para autenticação
    """
    if is_auth:
        # Para autenticação: nonce + comprimento total + 1
        middle = total_length.to_bytes(7, 'big')
        last_byte = 0x80  # 1000 0000 em binário
    else:
        # Para cifra normal: nonce + contador + 0
        middle = counter.to_bytes(7, 'big')
        last_byte = 0x00

    return nonce + middle + bytes([last_byte])

#### **3. Criptografia e Descriptografia**
A cifra AES é usada com o modo ECB (Electronic Codebook). Embora o ECB geralmente não seja recomendado por questões de segurança, ele é usado aqui por motivos de simplicidade e para demonstrar o funcionamento da cifra tweakable. O plaintext é combinado com o tweak antes de ser cifrado e, no processo de descriptografia, o tweak é usado novamente para reverter a transformação.

In [11]:
def tweakable_block_cipher_encrypt(key: bytes, tweak: bytes, plaintext: bytes) -> bytes:
    if len(plaintext) != 16:
        raise ValueError("Plaintext deve ter exatamente 16 bytes")
    
    # XOR do plaintext com o tweak
    xored = bytes(a ^ b for a, b in zip(plaintext, tweak))
    
    # Cifra o resultado
    cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
    encryptor = cipher.encryptor()
    return encryptor.update(xored) + encryptor.finalize()

def tweakable_block_cipher_decrypt(key: bytes, tweak: bytes, ciphertext: bytes) -> bytes:
    if len(ciphertext) != 16:
        raise ValueError("Ciphertext deve ter exatamente 16 bytes")
    
    # Decifra primeiro
    cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
    decryptor = cipher.decryptor()
    decrypted = decryptor.update(ciphertext) + decryptor.finalize()
    
    # XOR com o tweak
    return bytes(a ^ b for a, b in zip(decrypted, tweak))

#### **4. Criptografia com AEAD**
A criptografia AEAD combina cifragem e autenticação. As mensagens são cifradas, e ao mesmo tempo, uma tag de autenticação é gerada para garantir a integridade da mensagem e autenticidade.

Abaixo temos a função aead_encrypt(), que cifra a mensagem, gerando a tag de autenticação:

In [12]:
def compute_plaintext_parity(plaintext: bytes) -> bytes:
    """Calcula a paridade do plaintext para autenticação"""
    result = bytearray(16)  # Bloco de 16 bytes preenchido com zeros
    for i in range(0, len(plaintext), 16):
        block = plaintext[i:min(i+16, len(plaintext))]
        for j, byte in enumerate(block):
            result[j] ^= byte
    return bytes(result)

def aead_encrypt(key: bytes, nonce: bytes, plaintext: bytes) -> bytes:
    # Determina o tamanho do último bloco (τ)
    tau = len(plaintext) % 16
    original_length = len(plaintext)
    
    # Padding do plaintext (se necessário)
    if tau > 0:
        padding_length = 16 - tau
        padded_plaintext = plaintext + b'\x00' * padding_length
    else:
        padded_plaintext = plaintext
        tau = 0
    
    # Número de blocos completos
    num_blocks = len(padded_plaintext) // 16
    
    ciphertext = b""

    # Cifra os blocos completos
    for i in range(num_blocks - 1):  # Exclui o último bloco
        block = padded_plaintext[i*16:(i+1)*16]
        tweak = construct_tweak(nonce, i, False)
        encrypted_block = tweakable_block_cipher_encrypt(key, tweak, block)
        ciphertext += encrypted_block
    
    # Trata o último bloco de forma especial
    last_block = padded_plaintext[-16:]
    
    # Cifra o último bloco usando um tweak especial
    tweak_last = construct_tweak(nonce, num_blocks - 1, False)
    encrypted_last = tweakable_block_cipher_encrypt(key, tweak_last, last_block)
    ciphertext += encrypted_last
    
    # Cifra o tau separadamente usando outro tweak
    tweak_tau = construct_tweak(nonce, num_blocks, False)
    tau_block = tau.to_bytes(16, 'big')  # Converte tau para um bloco de 16 bytes
    encrypted_tau = tweakable_block_cipher_encrypt(key, tweak_tau, tau_block)
    ciphertext += encrypted_tau
    
    # Calcula a tag de autenticação
    parity = compute_plaintext_parity(plaintext)  # Usa plaintext original, não o padded
    auth_tweak = construct_tweak(nonce, 0, True, original_length)
    tag = tweakable_block_cipher_encrypt(key, auth_tweak, parity)
    
    return ciphertext + tag

def aead_decrypt(key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
    if len(ciphertext) < 48:  # Mínimo: um bloco + bloco de tau + tag
        raise ValueError("Ciphertext muito curto")
    
    # Separa o texto cifrado da tag
    tag = ciphertext[-16:]
    actual_ciphertext = ciphertext[:-16]
    
    # Número de blocos no ciphertext (incluindo o bloco de tau)
    num_blocks = len(actual_ciphertext) // 16
    
    # Decifra os blocos completos, exceto os dois últimos (último bloco real e bloco de tau)
    plaintext = b""
    for i in range(num_blocks - 2):
        block = actual_ciphertext[i*16:(i+1)*16]
        tweak = construct_tweak(nonce, i, False)
        decrypted_block = tweakable_block_cipher_decrypt(key, tweak, block)
        plaintext += decrypted_block
    
    # Decifra o último bloco real
    last_block_encrypted = actual_ciphertext[-32:-16]
    tweak_last = construct_tweak(nonce, num_blocks - 2, False)
    last_block = tweakable_block_cipher_decrypt(key, tweak_last, last_block_encrypted)
    
    # Decifra o tau
    tau_encrypted = actual_ciphertext[-16:]
    tweak_tau = construct_tweak(nonce, num_blocks - 1, False)
    tau_block = tweakable_block_cipher_decrypt(key, tweak_tau, tau_encrypted)
    tau = int.from_bytes(tau_block, 'big')
    
    if tau > 16:
        raise ValueError("Valor de tau inválido")
    
    # Adiciona o último bloco (removendo o padding se necessário)
    if tau == 0:
        plaintext += last_block
    else:
        plaintext += last_block[:tau]
    
    # Verifica a tag
    parity = compute_plaintext_parity(plaintext)
    auth_tweak = construct_tweak(nonce, 0, True, len(plaintext))
    computed_tag = tweakable_block_cipher_encrypt(key, auth_tweak, parity)
    
    if not hmac.compare_digest(computed_tag, tag):
        raise ValueError("Tag de autenticação inválida!")
    
    return plaintext

#### **5. SecureChannel**

A classe SecureChannel implementa um canal de comunicação seguro utilizando criptografia de chave pública (X25519) para troca de chaves e assinatura digital (Ed25519) para autenticação. Ela também oferece cifragem e decifragem de mensagens com a chave compartilhada, utilizando o modo AEAD para garantir confidencialidade e integridade.

In [13]:
class SecureChannel:
    def __init__(self):
        self.private_key = x25519.X25519PrivateKey.generate()
        self.public_key = self.private_key.public_key()
        self.signing_key = ed25519.Ed25519PrivateKey.generate()
        self.verification_key = self.signing_key.public_key()
        self.shared_key = None
        self.used_nonces = set()  # Armazena nonces já utilizados

    def sign_public_key(self):
        # Serializa a chave pública para bytes
        public_key_bytes = self.public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw
        )
        print(f"Chave pública serializada (Alice ou Bob): {public_key_bytes.hex()}")
        # Assina os bytes da chave pública
        signature = self.signing_key.sign(public_key_bytes)
        print(f"Assinatura gerada: {signature.hex()}")
        return signature

    def verify_public_key_signature(self, public_key: x25519.X25519PublicKey, signature: bytes, peer_verification_key: ed25519.Ed25519PublicKey):
        # Serializa a chave pública para bytes
        public_key_bytes = public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw
        )

        print(f"Chave pública recebida para verificação: {public_key_bytes.hex()}")
        print(f"Chave de verificação utilizada: {peer_verification_key.public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw).hex()}")

        try:
            # Tenta verificar a assinatura usando a chave de verificação do peer
            peer_verification_key.verify(signature, public_key_bytes)
            print("Assinatura verificada com sucesso!")
            return True
        except Exception as e:
            print(f"Erro na verificação da assinatura: {e}")
            return False

    def exchange_keys(self, peer_public_key: x25519.X25519PublicKey, peer_signature: bytes, peer_verification_key: ed25519.Ed25519PublicKey):
        # Verifica a assinatura da chave pública do peer
        if not self.verify_public_key_signature(peer_public_key, peer_signature, peer_verification_key):
            raise ValueError("Assinatura inválida da chave pública do peer")

        # Troca das chaves X25519 e geração da chave compartilhada
        shared_secret = self.private_key.exchange(peer_public_key)
        self.shared_key, _ = derive_key_and_tweak(shared_secret)
        return self.shared_key
    
    def key_confirmation(self):
        # Confirmação da chave - cada parte gera um valor (ex: HMAC) para provar que conhece a chave derivada
        if not self.shared_key:
            raise ValueError("Chaves não foram trocadas ainda")
        
        # Utilizando HMAC para gerar o valor de confirmação
        hmac = HMAC(self.shared_key, hashes.SHA256())
        hmac.update(b"key_confirmation")  # A string de confirmação pode ser personalizada
        confirmation_value = hmac.finalize()
        return confirmation_value

    def verify_key_confirmation(self, confirmation_value: bytes):
        # Verifica se o valor de confirmação recebido é válido
        if not self.shared_key:
            raise ValueError("Chaves não foram trocadas ainda")

        hmac = HMAC(self.shared_key, hashes.SHA256())
        hmac.update(b"key_confirmation")
        expected_value = hmac.finalize()
        return confirmation_value == expected_value

    def sign_message(self, message: bytes) -> bytes:
        return self.signing_key.sign(message)

    def verify_signature(self, message: bytes, signature: bytes, peer_verification_key: ed25519.Ed25519PublicKey) -> bool:
        try:
            peer_verification_key.verify(signature, message)
            return True
        except:
            return False

    def encrypt_message(self, message: bytes, nonce: bytes = None) -> tuple[bytes, bytes]:
        if not self.shared_key:
            raise ValueError("Chaves não foram trocadas ainda")
        
        if nonce is None:
            nonce = generate_nonce()  # Se o nonce não for passado, gera um novo

        # Tenta gerar um nonce único
        while nonce in self.used_nonces:
            nonce = generate_nonce()  # Gera um novo nonce

        self.used_nonces.add(nonce)  # Marca o nonce como usado
        
        ciphertext = aead_encrypt(self.shared_key, nonce, message)
        return ciphertext, nonce

    def decrypt_message(self, ciphertext: bytes, nonce: bytes) -> bytes:
        if not self.shared_key:
            raise ValueError("Chaves não foram trocadas ainda")
        
        return aead_decrypt(self.shared_key, nonce, ciphertext)

#### **6. Testes de Segurança**

**6.1. Troca de Chaves** -
Alice e Bob trocam chaves públicas e derivam uma chave compartilhada. Se as chaves derivadas forem diferentes, um erro é levantado.

**6.2. Criptografia e Descriptografia** -
Alice cifra uma mensagem e envia para Bob. Bob descriptografa a mensagem utilizando a chave compartilhada e verifica a integridade do conteúdo.

**6.3. Assinatura e Verificação** -
Alice assina a mensagem cifrada e Bob verifica a assinatura com a chave pública de Alice, garantindo autenticidade.

**6.4. Teste de Integridade** - 
Uma mensagem é adulterada e Bob detecta a modificação ao tentar descriptografá-la, rejeitando a mensagem alterada.

**6.5. Nonce Incorreto** - 
Bob tenta descriptografar uma mensagem com um nonce incorreto. A operação é rejeitada.

**6.6. Testes Adicionais**
- **Tamanhos Variados:** Testes com diferentes tamanhos de mensagens.
- **Nonce Reutilizado:** Bob rejeita uma mensagem com nonce reutilizado.
- **Tag de Autenticação:** Bob detecta alterações na tag de autenticação.
- **Ataque Cut-and-Paste:** Bob rejeita uma mensagem alterada por ataque "cut-and-paste".

In [14]:
async def main():
    print("\nINICIO DOS TESTES\n")

    # Criar canais seguros para Alice e Bob
    alice = SecureChannel()
    bob = SecureChannel()

    print("[1] Troca de chaves entre Alice e Bob...")

    # Alice assina a sua chave pública
    alice_signature = alice.sign_public_key()
    # Bob assina a sua chave pública
    bob_signature = bob.sign_public_key()

    # Alice envia a sua chave pública e assinatura para Bob
    alice_key = alice.exchange_keys(bob.public_key, bob_signature, bob.verification_key)

    # Bob envia a sua chave pública e assinatura para Alice
    bob_key = bob.exchange_keys(alice.public_key, alice_signature, alice.verification_key)

    if not hmac.compare_digest(alice_key, bob_key):
        raise ValueError("ERRO: As chaves derivadas são diferentes!")
    
    print("[1] Troca de chaves bem-sucedida!")

        # ---- Início da Key Confirmation ----
    print("\n[1.1] A iniciar a confirmação de chave...")

    # Alice gera o valor de confirmação
    alice_confirmation_value = alice.key_confirmation()
    print(f"Alice gerou o valor de confirmação: {alice_confirmation_value.hex()}")

    # Alice envia o valor de confirmação para Bob
    # Bob verifica o valor de confirmação de Alice
    if not bob.verify_key_confirmation(alice_confirmation_value):
        raise ValueError("ERRO: A confirmação de chave de Alice falhou!")
    print("Bob verificou a confirmação de Alice com sucesso!")

    # Bob gera o valor de confirmação
    bob_confirmation_value = bob.key_confirmation()
    print(f"Bob gerou o valor de confirmação: {bob_confirmation_value.hex()}")

    # Bob envia o valor de confirmação para Alice
    # Alice verifica o valor de confirmação de Bob
    if not alice.verify_key_confirmation(bob_confirmation_value):
        raise ValueError("ERRO: A confirmação de chave de Bob falhou!")
    print("Alice verificou a confirmação de Bob com sucesso!")  
        # ---- Fim da Key Confirmation ----

    message = b"Mensagem de teste com tamanho nao multiplo de 16 bytes!"
    print("\n[2] Alice cifra e envia a mensagem para Bob...")
    encrypted_message, nonce = alice.encrypt_message(message)

    print("Bob tenta descriptografar a mensagem recebida...")
    decrypted_message = bob.decrypt_message(encrypted_message, nonce)

    assert message == decrypted_message, "ERRO: A mensagem descriptografada está incorreta!"
    print("[2] Mensagem enviada e recebida com sucesso!")

    print("\n[3] Alice assina a mensagem cifrada...")
    signature = alice.sign_message(encrypted_message)

    print("Bob verifica a assinatura...")
    assert bob.verify_signature(encrypted_message, signature, alice.verification_key), "ERRO: Assinatura inválida!"
    print("[3] Assinatura verificada com sucesso!")

    # Teste de adulteração de mensagem
    print("\n[4] Teste de integridade: Mensagem adulterada...")
    tampered_message = bytearray(encrypted_message)
    tampered_message[10] ^= 1  # Modifica um byte aleatório

    try:
        print("Bob tenta descriptografar a mensagem adulterada...")
        bob.decrypt_message(bytes(tampered_message), nonce)
        print("ERRO: Bob conseguiu descriptografar uma mensagem adulterada!")
    except ValueError:
        print("[4] Bob detectou a adulteração e rejeitou a mensagem!")

    # Teste de nonce incorreto
    print("\n[5] Teste de descriptografia com NONCE errado...")
    try:
        wrong_nonce = generate_nonce()  # Gerando um nonce diferente
        print("Bob tenta descriptografar com um nonce errado...")
        bob.decrypt_message(encrypted_message, wrong_nonce)
        print("ERRO: Bob conseguiu descriptografar com nonce errado!")
    except ValueError:
        print("[5] Bob rejeitou a mensagem com nonce incorreto!")

    print("\nINICIO DOS TESTES ADICIONAIS\n")

    print("[A1] Teste com mensagens de tamanhos variados...")
    sizes = [16, 32, 50, 128, 10_000]  # Vários tamanhos de mensagens
    for size in sizes:
        msg = os.urandom(size)
        encrypted, nonce = alice.encrypt_message(msg)
        decrypted = bob.decrypt_message(encrypted, nonce)
        assert msg == decrypted, f"Erro ao descriptografar mensagem de tamanho {size}"
    print("[A1] Teste concluído com sucesso!")

    print("\n[A2] Teste de nonce reutilizado...")
    message1 = b"Mensagem 1"
    message2 = b"Mensagem 2"

    nonce = generate_nonce()  # Geramos um nonce fixo

    # Agora passamos o mesmo nonce para ambas as mensagens
    encrypted1, _ = alice.encrypt_message(message1, nonce)
    encrypted2, _ = alice.encrypt_message(message2, nonce)

    try:
        bob.decrypt_message(encrypted2, nonce)  # Tentamos descriptografar com nonce repetido
        print("ERRO: Bob conseguiu descriptografar com nonce reutilizado!")
    except ValueError:
        print("[A2] Bob rejeitou a mensagem com nonce reutilizado, como esperado.")

    print("\n[A3] Teste de integridade da tag de autenticação...")
    encrypted, nonce = alice.encrypt_message(b"Teste de tag")
    tampered_tag = bytearray(encrypted)
    tampered_tag[-1] ^= 1  # Modifica um único byte na tag

    try:
        bob.decrypt_message(bytes(tampered_tag), nonce)
        print("ERRO: Bob aceitou uma tag de autenticação alterada!")
    except ValueError:
        print("[A3] Bob corretamente detectou e rejeitou a tag adulterada.")

    print("\n[A4] Teste de ataque cut-and-paste...")
    encrypted1, nonce1 = alice.encrypt_message(b"Bloco 1 - Mensagem A")
    encrypted2, nonce2 = alice.encrypt_message(b"Bloco 2 - Mensagem B")

    modified_ciphertext = encrypted1[:8] + encrypted2[8:]  # Mistura blocos de duas mensagens

    try:
        bob.decrypt_message(modified_ciphertext, nonce1)
        print("ERRO: Bob aceitou uma mensagem alterada via cut-and-paste!")
    except ValueError:
        print("[A4] Bob corretamente rejeitou a mensagem modificada.")


    print("\nTodos os testes completados com sucesso!\n")

await main()


INICIO DOS TESTES

[1] Troca de chaves entre Alice e Bob...
Chave pública serializada (Alice ou Bob): 2d6a7caaff18f018085379bf98c8148697cca99842810488dd4f52a35b2ef445
Assinatura gerada: c110a7acfd61c54ec62766b2f0194eb0d39054279fde4405810f6dfe8e569513e297eac3255c1a5aecb0ced2e39df20dc8375795a5cc0490aeaaba3380be9a07
Chave pública serializada (Alice ou Bob): f2f2c626cd0f31e8fd67df42052778e1ab39029fc27ea257d18d55c746378067
Assinatura gerada: c566be31b92b04bb760a4e1a023bd286919f0e65bf3a88e97dc7ab6f59e776481273ecf8e30543b02d0583dd7612f46879d1c1543aceaf4dbe7d10311aa4d608
Chave pública recebida para verificação: f2f2c626cd0f31e8fd67df42052778e1ab39029fc27ea257d18d55c746378067
Chave de verificação utilizada: 603eefcc0e22b787673400f8ac55c118aba644595bfcba966d682aa6bc5f61bc
Assinatura verificada com sucesso!
Chave pública recebida para verificação: 2d6a7caaff18f018085379bf98c8148697cca99842810488dd4f52a35b2ef445
Chave de verificação utilizada: 53f622252d608cab1d1c795d1fc3e5a652d0af2e14ac7295d163f