# Collisions sur le chiffrement AES 

In [32]:
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes
import Crypto.Util.number
import doctest
import base64
import random
import math
import time
import hashlib
import sys
import concurrent.futures

### Outils pour la manipulation des bytes

In [33]:
def key_generator_AES(kp):
    """
    Génere une clef de 16 octets avec kp bits significatives
    >>> kp = 10
    >>> len(key_generator_AES(kp)) == 16
    True
    """
    prng = random.SystemRandom()
    n = prng.getrandbits(kp)
    n = n.to_bytes(16, byteorder='little')
    return n

In [34]:
def extract_bits(x, kp):
    """         
    Extrait kp bits de x, où x est de type bytes
    >>> kp = 10
    >>> x = key_generator_AES(kp)
    >>> len (extract_bits(x, kp) ) == (kp//8 + 1)
    True
    """
    res = list(x[0:kp//8])    
    tmp = kp - ((kp//8)*8)            # bits restant à recuperer
    if  tmp > 0 :
        res.append(x[kp//8] & ((2**tmp)-1))   
    return bytes(res)


### Chiffrement AES

In [35]:
def simple_enc_AES(msg, key, mode = 'MODE_ECB'):
    """
    Chiffrement AES
    Retourne le message chiffré par l'AES en bytes
    """
    if not (isinstance(msg,bytes)):
        msg = msg.encode()
        
    cipher = AES.new(key, AES.MODE_ECB)
    enc = cipher.encrypt(pad(msg, len(key)))
    return enc


def simple_dec_AES(msg, key, mode = 'MODE_ECB'):
    """
    Déchiffrement AES
    Retourne le message non unpader et en bytes
    
    ATTENTION => le retour n'est pas UNPADER, il faut utileser unpad !!!
                (on ne le unpad pas dans la fonction car cela genere des erreur)
    
    >>> msg = 'test'
    >>> key = key_generator_AES(10)
    >>> encrypt = simple_enc_AES(msg, key)
    >>> unpad(simple_dec_AES(encrypt,key),16).decode()
    'test'
    """
    cipher = AES.new(key, AES.MODE_ECB)
    plaintext = cipher.decrypt(msg) 
    return plaintext

In [36]:
def double_AES(msg, key1, key2,  mode = 'MODE_ECB'):
    """
    Double chiffrement AES
    en passant les 2 clefs key1 et key2 en parametres
    (on a pas defini les autres modes)
    """
    enc = simple_enc_AES(msg, key1)
    enc = simple_enc_AES(enc, key2)
    return enc

### Collision Search

In [47]:
def new_step_cte(f, M, kp, x, cte):
    
    """
    Passer d'un xi au suivant
    f : fonction chiffrement OU dechiffrement
    M : un message  
    kp : nb de bit significatif de la clef
    xi : 
    cte :
    
    >>> x = extract_bits(key_generator_AES(13),13)
    >>> type(new_step_cte(simple_enc_AES, 'test', 10, x, 2))
    <class 'bytes'>
    >>> len(new_step_cte(simple_enc_AES, 'test', 10, x, 2)) == 16
    True
    """
    
    if len(xi) < 16:                     # des fois les 8 premiers bits sont nuls
        x = x.rjust(16,b"\x00") 

    c = f(M, xi)
    c = bytearray(c)
    tmp = (kp // 8) + 1
    for i in range(tmp):
        c[i] = (c[i]+cte) %256
    tmp = extract_bits(c,kp)
    
    if len(tmp) < 16:                     # des fois les 8 premiers bits sont nuls
        tmp = tmp.ljust(16,b"\x00") 
        
    return tmp

In [38]:
def trail(f, msg, kp, l, x0, cte):
    """
    Retourne un triplet (x0, xd, d) 
    f : fonction chiffrement OU dechiffrement
    msg : message clair OU chiffré deux fois  
    kp :nb de bit significatif de la clef
    l : nb de bit à 0 (pour la condition d'arrêt)
    
    >>> msg = "Voici le message"
    >>> x0 = key_generator_AES(kp)
    >>> kp = 10
    >>> l = 3
    >>> cte = 2
    >>> (x0,xd,d) = trail(simple_enc_AES, msg, kp, l, x0, cte)
    >>> tmp = x0
    >>> for _ in range(d):
    ...    tmp = new_step_cte(simple_enc_AES, msg, kp, tmp, cte)     
    >>> print(tmp==xd)
    True
    
    >>> mask_l = 2**l - 1 
    >>> xd = extract_bits(xd,kp)
    >>> xd = int.from_bytes(xd, 'big')
    >>> print( xd & mask_l == 0 )
    True
    """
    
    tmp = x0

    max_it = (20/0.5**l) #// 3       # diviser par 3 car sinon ca prends tres longtemps
    mask_l = 2**l - 1               

    d = 1                                # compter le nb de pas
        
    tmp = new_step_cte(f, msg, kp, tmp, cte)   # on ne considere pas le 1er point comme point critique
    while True:
        
        if d == max_it:
            #print("Risque de cycle ")
            return None
        
        tmp_binary = extract_bits(tmp,kp) 
        tmp_binary = int.from_bytes(tmp_binary, "big")
        if tmp_binary & mask_l == 0:  # condition d'arret
            xd = tmp
            return (x0, xd, d)
            
        d += 1
        tmp = new_step_cte(f, msg, kp, tmp, cte)    
    

In [39]:
def F(b):
    """
    Choisir une fonction 
    b : 0 OU 1
    
    0 correspond à simple_enc_AES
    1 correspond à simple_dec_AES
    """
    if b == 0:
        return simple_enc_AES
    if b == 1:
        return simple_dec_AES

In [40]:
def remonter (F, A, B, M, C, kp, b, cte):
    """
    Returne ( (x, f1) , (y, f2) ) tq x != y et f1(x) == f2(y)
    F : choix entre chiffrement et déchiffrement 
    A, B : triplet (x0, xd, d)
    M, C : clair et chiffré double
    kp : nb de bits significatif de la clef
    b : 0 pour enc ou 1 pour dec
    """       
    couple = [M,C]
    cpt = 0
    
    if A[2] >= B[2]:  # si la longueur du trail A > de celui de B 

        x = A[0]
        for _ in range(A[2]-B[2]):
            x =  new_step_cte(F(b), couple[b], kp, x, cte)
        y = B[0]
        if x == y : 
            #print('pb : x==y et fhash(x)==fhash(y)')
            return None

        while True:
            if x == y :
                break
            tmp1 = x            
            tmp2 = y             # anciennes valeurs
            x =  new_step_cte(F(b), couple[b], kp, tmp1, cte)
            y = new_step_cte(F(1-b), couple[1-b], kp, tmp2, cte)
            
            if cpt>1000: # boucle ??
                #print("boucle ??")
                return None
            cpt+=1
            
        return (   (tmp1, b) , (tmp2, 1-b)    )
        
        
    else:          # A[2] < B[2] mais on fait la meme chose
        y = B[0]
        for _ in range(B[2]-A[2]):
            y =  new_step_cte(F(1-b), couple[1-b], kp, y, cte)
        x = A[0]
        if x == y : 
            #print('pb : x==y et fhash(x)==fhash(y)')
            return None

        while True:
            if x == y :
                break

            tmp1 = x            
            tmp2 = y             # anciennes valeurs
            x = new_step_cte(F(b), couple[b], kp, tmp1, cte)
            y = new_step_cte(F(1-b), couple[1-b], kp, tmp2, cte)
            
            if cpt>1000: # boucle ??
                #print("boucle ?")
                return None
            cpt+=1

        return (   (tmp1, b) , (tmp2, 1-b)    )

In [41]:
def collision_detection(F, M, C, kp, l, dico, cte):
    """
    Detecte une seule collision
    Retourne le couple ( (x, f1) , (y, f2) ) tq x != y et f1(x) == f2(y) 
    F : choix entre chiffrement et déchiffrement 
    M, C : clair et chiffré double
    kp : nb de bits significatif de la clef
    l : nb de bits pour la condition d'arrêt
    dico : contenant les collisions trouvées
    
    >>> kp = 8
    >>> cte = 5
    >>> key1 = key_generator_AES(kp)
    >>> key2 = key_generator_AES(kp)
    >>> M = 'Voici le message'
    >>> C = double_AES(M, key1, key2)
    >>> l = 3
    >>> dico = {}
    >>> res = collision_detection(F, M, C, kp, l, dico, cte)
    >>> if (res[0][1]==0):  
    ...     enc = simple_enc_AES(M,res[0][0])
    ...     dec = simple_dec_AES(C, res[1][0])
    ... elif (res[0][1]==1):
    ...     enc = simple_enc_AES(M,res[1][0])
    ...     dec = simple_dec_AES(C, res[0][0])
    >>> print(enc[:kp//8]==dec[:kp//8] )
    True
    """
    
    couple = [M,C]
    seuil = 10000                  # valeur estimé pour le nb max d'éléments dans le dico
    
    while True: 
        b = random.randint(0,1) 
        x0 = key_generator_AES(kp)           # clef aléatoire initiale
        res = trail(F(b),couple[b], kp, l, x0, cte)
        if res == None:
            continue
 
        x0, xd, d = res

        if (xd,1-b) in dico:        # collision trouvée

            A = (x0,xd,d)                                   # b
            B = (dico[(xd,1-b)][0], xd, dico[(xd,1-b)][1])  # 1-b
            tmp = remonter(F, A , B, M, C, kp, b, cte)

            if(tmp == None):
                continue
            else: 
                return tmp
        
        # si la taille du dico depasse la limite fixée, choix aléatoire de la victime
        if len(dico) >= seuil:      
            delete = random.choice(list(dico.keys()))
            dico.pop(delete)
            
        dico[(xd,b)] = (x0, d)  

In [42]:
class Statistics:
    n_new_collisions = 0
    n_idem_collisions = 0
    def __init__(self):
        pass

In [43]:
def golden_collision(F, M1, C1, M2, C2 ,kp, l):
    """
    Trouve la golden collision en vérifiant à chaque fois
    les clefs obtenus avec (M2, C2)
    F : choix entre chiffrement et déchiffrement 
    M1, C1 : clair et chiffré double pour trouver les collisions
    M2, C2 : clair et chiffré double pour la vérificaiton
    kp : nb de bits significatif de la clef
    l : nb de bits pour la condition d'arrêt
    """
    dico = {}
    liste = []
    stat = Statistics()
    i=1
    cte = 2
    ancient = stat.n_new_collisions
    while True:
        
        if i%200==0:    
            print("Le nombre de new collisions",stat.n_new_collisions)
            print("Le nombre de collisions idem",stat.n_idem_collisions)
            if ancient == stat.n_new_collisions : # changer de version de new_step  quand l'ancienne version ne trouve plus de nouveau collision                  
                cte += 1
                print("\nOn varie NEW_STEP, on utilise la constante "+str(cte))
            else :
                ancient = stat.n_new_collisions
                      
        colli = collision_detection(F, M1, C1, kp, l,dico, cte)

        if colli == None:
            print('None')
            continue

        if colli in liste or (colli[1],colli[0]) in liste: # si collision deja trouvé 
            stat.n_idem_collisions += 1
            i+=1    
            continue 
          
        stat.n_new_collisions += 1
        liste.append(colli)
        
        if (len(liste)==((2**kp))):
            print('GROOS PB !!!!!' )
            return (liste, None)                      # pour debug
        
        
        try: 
            if colli[0][1] == 0:      # 0 correspond a enc
                tmp1 = simple_enc_AES(M2, colli[0][0])
                tmp2 = unpad(simple_dec_AES(C2, colli[1][0]), 16)
            else:
                tmp1 = unpad(simple_dec_AES(C2, colli[0][0]),16)
                tmp2 = simple_enc_AES(M2, colli[1][0])
            if( tmp1 == tmp2):
                
                from IPython.display import clear_output
                clear_output(wait=True)  # clear cell output 
                
                print( "GOLDEN COLLISION ! ")
                print("Voici la collision :",colli)
                print("On a utiliser:",i,"iterations")
                print("On a trouvé ", stat.n_new_collisions,"collisions differentes avant de tomber sur la bonne")
                return (liste,colli)              # liste pour debug
        except ValueError:
            pass      
        i+=1

In [44]:
def golden_collision_parr(F, M1, C1, M2, C2 ,kp, l):
    """
    Trouve la golden collision en vérifiant à chaque fois
    les clefs obtenus avec (M2, C2)
    Version parrallele
    F : choix entre chiffrement et déchiffrement 
    M1, C1 : clair et chiffré double pour trouver les collisions
    M2, C2 : clair et chiffré double pour la vérificaiton
    kp : nb de bits significatif de la clef
    l : nb de bits pour la condition d'arrêt
    """
    dico = {}
    liste = []
    stat = Statistics()
    i=1
    cte = 2
    ancient = stat.n_new_collisions
    p = Crypto.Util.number.getPrime(kp, randfunc=Crypto.Random.get_random_bytes)
    
    pool = concurrent.futures.ProcessPoolExecutor()
    deb = time.time()
     
    
    while True:
        futures = [] 
        
        # Affichage de l'avancement
        print("Le nombre de new collisions",stat.n_new_collisions)
        print("Le nombre de collisions idem",stat.n_idem_collisions)
        if ancient == stat.n_new_collisions : # changer de version de new_step  quand l'ancienne version ne trouve plus de nouveau collision                  
            cte += 1
            print()
            print("On varie NEW_STEP, avec la constante : "+str(cte))
        else :
            ancient = stat.n_new_collisions       
        
        
        for x in range(50):
            futures.append(pool.submit(collision_detection, F, M1, C1, kp, l,dico, cte))

        for x in (futures):   
            colli = (x.result())
            if colli == None:
                print('None')
                continue

            if colli in liste or (colli[1],colli[0]) in liste: # si collision deja trouvé 
                stat.n_idem_collisions += 1
                i+=1    
                continue 

            stat.n_new_collisions += 1
            liste.append(colli)

            try: 

                if colli[0][1] == 0:      # 0 correspond a enc
                    tmp1 = simple_enc_AES(M2, colli[0][0])
                    tmp2 = unpad(simple_dec_AES(C2, colli[1][0]), 16)
                else:
                    tmp1 = unpad(simple_dec_AES(C2, colli[0][0]),16)
                    tmp2 = simple_enc_AES(M2, colli[1][0])
                if( tmp1 == tmp2):
                    fin = time.time()
                    from IPython.display import clear_output
                    clear_output(wait=True)  # clear cell output 
                    print( "GOLDEN COLLISION ! ")
                    print("Voici la collision :",colli)
                    print("On a utiliser:",i,"iterations")
                    print("On a trouvé ", stat.n_new_collisions,"collisions differentes avant de tomber sur la bonne")
                    return (stat.n_new_collisions, fin-deb )
            except ValueError:
                pass  
            i+=1

            

### Test

In [45]:
kp = 8
l=1
M1 = "Voici le message 1"
M2 = "Voici le message 2"
key1 = key_generator_AES(kp)
key2 = key_generator_AES(kp)
C1 = double_AES(M1,key1,key2)
C2 = double_AES(M2,key1,key2)

print("Voici la clef 1 : ",key1)
print("Voici la clef 2 : ",key2)

Voici la clef 1 :  b'r\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Voici la clef 2 :  b'/\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'


In [48]:
res = golden_collision_parr(F, M1, C1, M2, C2 ,kp, l)

Le nombre de new collisions 0
Le nombre de collisions idem 0

On varie NEW_STEP, avec la constante : 3


NameError: name 'xi' is not defined