# Generic

In [44]:
import math
import base64
import binascii

def to_bytes(d, format):
    if isinstance(d, (bytes, bytearray)):
        return d
    elif format == 'hex':
        return bytes(bytearray.fromhex(d))
    elif format == 'base64':
        return base64.b64decode(d)
    elif format == 'str' or format == 'bytes':
        return d.encode()
    elif format == 'int':
        return to_bytes(hex(d)[2:], 'hex')

def bytes_to(b, format):
    if not isinstance(b, (bytes, bytearray)):
        return b
    elif format == 'hex':
        return binascii.hexlify(b).decode()
    elif format == 'base64':
        return base64.b64encode(b).decode()
    elif format == 'str':
        return b.decode()
    elif format == 'bytes':
        return b
    elif format == 'int':
        return int(bytes_to(b, 'hex')[2:], 16)

In [45]:
import math
import itertools

def xor(d1, d2, format):
    b1,b2 = to_bytes(d1, format), to_bytes(d2, format)
    return bytes_to(bytes(char1 ^ char2 for char1,char2 in zip(b1,b2)), format)

def pad(text, blocksize=16):
    padsize = blocksize - (len(text) % blocksize)
    if padsize == 0:
        padsize = blocksize
    return text + bytes([padsize]*padsize)
assert pad(b"YELLOW SUBMARINE", blocksize=20) == b'YELLOW SUBMARINE\x04\x04\x04\x04'

def unpad(text):
    """Validate padding and return unpadded value"""
    try:
        assert text[-text[-1]:] == bytes([text[-1]]*text[-1])
    except:
        print(text)
        raise
    return text[:-text[-1]]
assert unpad(pad(b"this is random text", blocksize=7)) == b"this is random text"

# Crypto

In [66]:
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend


def decrypt_aes_ecb(enc, key, IV=None, use_padding=True):
    """No magic here
    (The IV is not actually used)"""
    cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
    decryptor = cipher.decryptor()
    clear = decryptor.update(enc) + decryptor.finalize()
    if use_padding:
        return unpad(clear)
    else:
        return clear

def encrypt_aes_ecb(clear, key, IV=None, use_padding=True):
    """No magic here
    (The IV is not actually used)"""
    if use_padding:
        clear = pad(clear)
    cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
    encryptor = cipher.encryptor()
    ct = encryptor.update(clear) + encryptor.finalize()
    return ct
assert decrypt_aes_ecb(encrypt_aes_ecb(b'test', b'YELLOW SUBMARINE'), b'YELLOW SUBMARINE') == b'test'

def encrypt_aes_cbc(clear, key, IV=None):
    """CBC using the ECB mode of above
    """
    if IV is None:
        IV=b'\x00'*16
    clear = pad(clear) # Ensure the size is divisible by 16
    prev_block = IV
    cipher = b''
    for blockstart in range(0, len(clear), 16):
        block = clear[blockstart:blockstart+16]
        cipher_block = encrypt_aes_ecb(xor(prev_block, block, 'bytes'), key, use_padding=False)
        prev_block = cipher_block
        cipher += cipher_block
    return cipher

def decrypt_aes_cbc(cipher, key, IV=None):
    """CBC using the ECB mode of above
    """
    if IV is None:
        IV=b'\x00'*16
    clear = b''
    prev_block = IV
    for blockstart in range(0, len(cipher), 16):
        block = cipher[blockstart:blockstart+16]
        tmp = decrypt_aes_ecb(block, key, use_padding=False)
        clear_block = xor(prev_block, tmp, 'bytes')
        prev_block = cipher[blockstart:blockstart+16]
        clear += clear_block
    return unpad(clear)
assert decrypt_aes_cbc(encrypt_aes_cbc(b'test'*200, b'YELLOW SUBMARINE'), b'YELLOW SUBMARINE') == b'test'*200

def MAC(message, key, IV):
    """Simple MAC calculation.
    """
    return encrypt_aes_cbc(message, key, IV)[-16:]

# Cryptopals Notebook

## Set 7

### Exercice 49

CBC-MAC Message Forgery

In [122]:
# Setup : a client and a server share a key K.
# The server has methods to validate an incoming message from the client


import os, random

class Client:
    def __init__(self, key, account):
        """Session for a given account holder.
        Allows the holder to generate messages from transactions where he is the origin.
        """
        self._key = key
        self._account = account
        
    def emit_simple(self, target, amount, iv):
        """Emit a transaction from this client to any given target."""
        msg = "from={}&to={}&amount={}".format(self._account, target, amount).encode()
        return msg, iv, MAC(msg, self._key, iv)
        
    def emit(self, targets, amounts):
        """Emit a transaction from this client to any given target."""
        tx_list = ';'.join('{}:{}'.format(t, a) for t,a in zip(targets, amounts))
        msg = "from={}&tx_list={}".format(self._account, tx_list).encode()
        return msg, MAC(msg, self._key, b'\x00'*16)

class Server:
    def __init__(self):
        self._key = os.urandom(16)
        
    def validate_simple(self, message, iv, mac):
        """For the simple çase with a user-provided IV"""
        return MAC(message, self._key, iv) == mac
    
    def validate(self, message, mac):
        """Using a fixed IV"""
        return MAC(message, self._key, b'\x00'*16) == mac
    
    def open_session(self, account):
        return Client(self._key, account)

    
def init():
    key = os.urandom(16)
    return Server(key), Client(key)

In [123]:
alice = Server()
bob = alice.open_session('bob')
assert alice.validate_simple(*bob.emit_simple('legit', 10, os.urandom(16)))
assert alice.validate(*bob.emit(['legit1', 'legit2'], [10, 20]))

If we control the IV it is easy, at you just have to set the new IV to :
$$\begin{align*}
IV_{new}
&= IV_{old} \bigoplus message_{block1} \bigoplus new\_message_{block1} 
\end{align*}$$
When checking the MAC, the first block will be :
$$\begin{align*}
IV_{new} \bigoplus new\_message_{block1}
&= IV_{old} \bigoplus message_{block1} \bigoplus new\_message_{block1} \bigoplus new\_message_{block1} \\
&= IV_{old} \bigoplus message_{block1}
\end{align*}$$
which is exactly the same first block as with our original message (from eve to eve). We can then just reuse the MAC as the rest of the message is the same.

In [124]:
# First : the simple case, with a controlled IV
eve = alice.open_session('eve')
amount = 10000
iv = os.urandom(16)
message, iv, mac = eve.emit_simple('eve', amount, iv)# Generate a message from eve to eve for a large amount

# Forge new message and associated IV
new_message = b'from=bob&to=eve&amount=' + str(amount).encode()
new_iv = xor(iv, xor(message[:16], new_message[:16], 'bytes'), 'bytes')
assert alice.validate_simple(new_message, new_iv, mac)

Without controlling the IV, I will add a single hypothesis to our attacker, which should not be too far fetched : the IV is known to the attacker. It is fixed, but known. Another constraint we have to add, is that the server does not validate messages too closely (meaning that we can insert bad transactions in the middle of good transactions and get away with still executing the good ones).

Our intercepted message is 3 blocks long (B1, B2 and B3 - which may be padded), while our length extension is only one block or shorter. Eve will try to get the MAC for the following message : 
```
eve_message = from=eve&tx_list=eve:100;eve:100;eve:1000000
eve_mac = MAC(evil_message)
```
Let's set a few variables :
$$\begin{align*}
MAC_{bob} = MAC(message_{bob}) = MAC(B1\ ||\ B2\ ||\ B3)\\
MAC_{eve} = MAC(message_{eve}) = MAC(B'1\ ||\ B'2)
\end{align*}$$

Now, we craft the following message :
$$\begin{align*}
message = B1\ ||\ B2\ ||\ B3\ ||\ (MAC_{bob} \bigoplus IV \bigoplus B'1)\ ||\ B'2
\end{align*}$$

When evaluating the MAC, the CBC encryption will arrive at the encryption of B3 and generate following the blocks :
$$\begin{align*}
AES(B3 \bigoplus\ ...) &\rightarrow\ MAC_{bob}\\
&\rightarrow MAC_{bob}\ \bigoplus\ (MAC_{bob} \bigoplus IV \bigoplus B'1)=IV \bigoplus B'1\\
&\rightarrow AES(IV \bigoplus B'1) \bigoplus B'2\\
&\rightarrow AES(AES(IV \bigoplus B'1) \bigoplus B'2)
&&= MAC_{eve}
\end{align*}$$

In [127]:
# Second : the hard case, without controlled iv
message_bob, mac_bob = bob.emit(['legit1', 'legit2', 'legit3'], [10, 11, 12])
iv = b'\x00' * 16

# Eve will emit a message to herself
message_eve, mac_eve = eve.emit(['eve', 'eve', 'eve'], [100, 100, 1000000])
evil_message = pad(message_bob, blocksize=16) + xor(xor(mac_bob, iv, 'bytes'), message_eve[:16], 'bytes') + message_eve[16:]
assert alice.validate(evil_message, mac_eve)

In order to protect against this, in theory we only have to have a secret IV. But this means the IV should be as secret as the key, which partially goes against the very idea of an IV. Also stricter validation should be used in order to detect and reject messages where garbage has been injected in the middle.

In [115]:
len("from=eve&tx_list=eve:100;eve:100")

32

In [55]:
os.urandom(16)

b'\xb7aJ\x8b\xa5o-\x0c\xfdq\xe87\xeb\xc8}\xac'

In [85]:
len( b'amount=10&to=eve')

16

In [82]:
len(new_iv)

16