# Trabalho Prático 1

# Problema 1

José Pedro Fernandes Veloso PG47377

João Pedro Neves Félix PG47336


In [136]:
import os
from cryptography.hazmat.primitives.asymmetric import x448, ed448
from cryptography.hazmat.primitives import hmac, hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
import cryptography.exceptions
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import (
    Cipher, algorithms, modes
)

> Canal privado de informação assíncrona

Criamos uma classe "Agent" que será capaz de interpretar os dois lados da conexão (recetor e emissor). É importante perceber que as operações de acordo de chaves entre agentes e de autenticação são feitas com métodos diferentes. Cada agent terá 2 conjuntos diferentes de chaves, o primeiro será o conjunto de chaves utilizadas para autenticar os agentes (Ed448 Signing&Verification) e o segundo são as chaves utilizadas para acordar numa shared key (x448).

Fazendo uso do módulo "criptography" do python geramos as chaves utilizadas para autenticação (uma pública e uma privada) para cada um dos agentes que vai participar na comunicação e de seguida geramos chaves para o acordo de uma chave partilhada (também uma publica e uma privada). Para gerar a chave partilhada cada um dos agentes partilha com o outro a sua respetiva chave publica e juntando essa chave à privada de cada um obtem se uma nova chave partilhada.

É preciso, no entanto, fazer ainda uma fase de autenticação para que os agentes saibam que a shared key é igual para os dois e para que saibam com quem estão a comunicar. Para este passo um dos agentes calcula a assinatura HMAC da shared key que calculou e de seguida assina essa mesma assinatura com a chave privada calculada usando ed448. Isto resulta numa mensagem composta por apenas duas assinaturas, a do agente e a da chave. O outro agente que participa neste canal de comunicação recebe estas assinaturas e começa por autenticar o outro agente usando uma chave publica ed448 enviado pelo outro agente. Se esta fase de autenticação for efetuada com sucesso ele vai então verificar a assinatura do HMAC da shared key para ter a certeza que de facto ambos os participantes da comunicação possuem a mesma chave.

In [137]:
class Agent:
    def __init__(self):
        self.private_key = None
        self.public_key  = None
        self.shared_key  = None
        self.signing_private_key = None
        self.signing_public_key  = None

    def generate_signing_keys(self):
        self.signing_private_key = ed448.Ed448PrivateKey.generate()
        self.signing_public_key  = self.signing_private_key.public_key()
        
    def generate_own_keys(self):
        self.private_key = x448.X448PrivateKey.generate()
        self.public_key  = self.private_key.public_key()

    def generate_shared_key(self, received_public_key):
        key = self.private_key.exchange(received_public_key)
        # Derivamos para que ela tenha o tamanho correto
        self.shared_key = HKDF(
            algorithm=hashes.SHA256(),
            length=64,
            salt=None,
            info=None,
        ).derive(key)

    def ask_for_confirmation_on_shared_key(self):
        h = hmac.HMAC(self.shared_key, hashes.SHA256())
        h.update(self.shared_key)
        key_signature = h.finalize()
        agent_signature = self.signing_private_key.sign(key_signature)
        return key_signature, agent_signature

    def confirm_shared_key(self, key_signature, agent_signature, received_public_key):
        try:
            received_public_key.verify(agent_signature, key_signature)
            print("Agent Authentication Successful")
            h = hmac.HMAC(self.shared_key, hashes.SHA256())
            h.update(self.shared_key)
            try: 
                h.verify(key_signature)
                print("Key confirmation was Successful")
            except cryptography.exceptions.InvalidSignature:
                print("Key confirmation was NOT Successful")
        except cryptography.exceptions.InvalidSignature:
            print("Agent Authentication Failed")

    def generate_tweak(self, i, tag):
        nonce = os.urandom(8)
        return nonce + i.to_bytes(7,byteorder = 'little') + tag.to_bytes(1,byteorder = 'little')

    def send_message(self, message):
        len_message = len(message)

        # Padding do plain text 
        padder = padding.PKCS7(64).padder()
        padded = padder.update(message) + padder.finalize()  
        cont=0
        cipher_text = b''
        for i in range(0, len(padded), 16):
            block = padded[i:i+16]
            if(i+17 < len(padded)):
                # Gerar um tweak com o contador do bloco e tag=0
                tweak = self.generate_tweak(cont, 0)
                encryptor = Cipher(
                    algorithms.AES(self.shared_key),
                    modes.XTS(tweak),
                    ).encryptor()
                cipher_text += tweak
                cipher_text += encryptor.update(block) + encryptor.finalize()
            else:
                # Gerar um tweak com o tamanho do plain text e tag=1
                tweak = self.generate_tweak(len_message,1)
                ct = b''
                for _, byte in enumerate(block): 
                    mascara = self.shared_key + tweak
                    ct += bytes([byte ^ mascara[0:16][0]])
                cipher_text += tweak 
                cipher_text += ct 
            cont+=1
        
        agent_signature = self.signing_private_key.sign(cipher_text)
        return agent_signature, cipher_text

    def parseTweak(self, tweak):
        nonce = tweak[:8]
        contEn = tweak[8:8+7]
        cont = int.from_bytes(contEn, byteorder = 'little' )
        tag = tweak[15]
        print(nonce, cont, tag)
        return nonce, cont, tag

    def receive_message(self, agent_signature, cipher_text, received_public_key):
        try:
            received_public_key.verify(agent_signature, cipher_text)
            print("Agent Authentication Successful")
            plaintext = b''
            i = 1
            tweak1 = cipher_text[:16]
            block = cipher_text[16:32]
        
            _, cont, tag = self.parseTweak(tweak1)
                
            while(tag != 1):
                if(cont == i-1):
                    
                    decryptor = Cipher(algorithms.AES(self.shared_key), mode=modes.XTS(tweak1)).decryptor()
                    f = decryptor.update(block) 
                    plaintext += f
                    # Tweak de 16 bytes, bloco de 16 bytes
                    # Exemplo: i=1
                    # tweak = cipher_text[32:48]
                    # block = cipher_text[48:64]
                    tweak1 = cipher_text[i*32:i*32 +16]  
                    block = cipher_text[i*32 +16:(i+1)*32]
                    
                    _, cont, tag = self.parseTweak(tweak1)
                i+= 1
            if (tag == 1):
                ct =b''
                for _, byte in enumerate(block):
                    mascara = self.shared_key + tweak1
                    ct += bytes([byte ^ mascara[0:16][0]])
                plaintext += ct

            # Unpadding da mensagem decifrada
            unpadder = padding.PKCS7(64).unpadder()
            unpadded_message = unpadder.update(plaintext) + unpadder.finalize()
            print(f'Plain text: {unpadded_message}')

        except cryptography.exceptions.InvalidSignature:
            print("Agent Authentication Failed")

> Enviar mensagem

Os parâmetros de controlo num TPBC são uma chave de “longa duração” e uma chave de “curta duração” designada por “tweak”.  
Quando uma TPBC está integrada numa cifra autenticada, a chave de longa duração tem sempre o mesmo valor nas várias invocações da primitiva. Ao invés o “tweak” que varia sempre entre invocações.

Para tal, geramos um tweak novo para cifrar cada bloco da mensagem (16 bytes), sendo que o ultimo bloco é cifrado utilizando uma operação XOR.

Para gerar o tweak, utilizamos a função generate_tweak(i, tag). Nesta função é gerado um nounce que ocupa os primeiros b/2 bits do tweak. Depois é contactenado um contador ao nonce, no caso em que nao é o ultimo bloco, ou é contactnado o tamanho do plain text sem padding. Por fim é adicionado uma tag com valor 0 caso nao seja o ultimo bloco, e 1 caso seja.

Com os tweaks gerados, cada bloco é cifrado com um tweak diferente utilizando o AES-256. O ultimo bloco é cifrado utilizando o operador ^ (XOR), com uma mascara gerada através da chave partilhada.

> Receber mensagem 

Para decifrar a mensagem é feito um parse do tweak, retirando a tag deste. Com esta tag verificamos qual das operaçoes é que se tem de efetuar para decifrar, isto é, caso seja 1 é utilizado o XOR, mas caso seja 0 é utilizado o AES com o tweak respetivo ao bloco que queremos decifrar.

Por fim, é feito um unpadding e temos o plain text.

In [138]:
agent1 = Agent()
agent2 = Agent()

# Geram as chaves privadas e publicas para troca DH
agent1.generate_own_keys()
agent2.generate_own_keys()

# Geram as chaves para assinaturas
agent1.generate_signing_keys()
agent2.generate_signing_keys()

# Geram a chave partilhada
agent1.generate_shared_key(agent2.public_key)
agent2.generate_shared_key(agent1.public_key)

# Fase de confirmação de chaves acordadas
key_signature, agent_signature = agent1.ask_for_confirmation_on_shared_key()
agent2.confirm_shared_key(key_signature, agent_signature, agent1.signing_public_key)

message = b"Mensagem secreta com tres blocos"
agent_signature, cipher_text = agent1.send_message(message)
agent2.receive_message(agent_signature, cipher_text, agent1.signing_public_key)

Agent Authentication Successful
Key confirmation was Successful
Agent Authentication Successful
b'\n\xf5s$N\xcc\x9b{' 0 0
b'\x95H\x91\xb0\xbd\xdfi\xf4' 1 0
b'\x0e\x00U\x0f&\xa8\x03w' 32 1
Plain text: b'Mensagem secreta com tres blocos'
