***


# Projet : Reed-Salomon, QR code et correction d’erreur
***
***


Jean-Loup Mellion  
Gatien Da Rocha

In [527]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import pandas as pd

# Rappel interpolation polynomiale

Avant de s'attaquer a Reed-Salomon, il est important de rappeler le principe de l'interpolation polynomiale. L'interpolation polynomiale est une méthode mathématique permettant de trouver un polynôme qui passe par un ensemble de points donnés.

Pour cela, nous allons utiliser la méthode de Lagrange. Cette méthode permet de trouver un polynôme de degré n-1 passant par n points donnés.

### Polynôme de Lagrange

In [528]:
def polynome_base_lagrange(x_array: np.ndarray, y_array: np.ndarray, var_x: float, i: int) -> float:
    """
    Cette fonction calcule le polynome de base de lagrange, c'est à dire le produit des termes (x-x_j)/(x_i-x_j) pour j != i
    :param x_array: np.ndarray, les valeurs de x
    :param y_array: np.ndarray, les valeurs de y
    :param var_x: float, la valeur de x pour laquelle on veut calculer le polynome de base
    :param i: int, l'indice du polynome de base
    :return: float, le polynome de base de lagrange
    """
    res = 1
    for j in range(len(x_array)):
        if j != i:
            res *= (var_x-x_array[j])/(x_array[i]-x_array[j])
    return y_array[i]*res

In [529]:
def poly_lagrange(x_array: np.ndarray, y_array: np.ndarray, var_x: float) -> float:
    """
    Cette fonction calcule le polynome de lagrange pour un x donné, c'est à dire la somme des polynomes de base de lagrange
    :param x_array: np.ndarray, les valeurs de x
    :param y_array: np.ndarray, les valeurs de y
    :param var_x: float, la valeur de x pour laquelle on veut calculer le polynome de lagrange
    """
    sum = 0
    for i in range(len(x_array)):
        sum += polynome_base_lagrange(x_array,y_array,var_x,i)
    return sum

Nous allons maintenant tester la méthode de Lagrange sur un exemple simple.

In [530]:
# On défnit le message à envoyer
message = [6, 9, 4, 2, 0]

# On encode le message
def encode_message(message: list, t: int) -> tuple:
    """
    Cette fonction encode un message en rajoutant de la redondance pour pouvoir corriger des erreurs
    :param message: list, le message à encoder
    :param t: int, le nombre d'erreurs que l'on veut pouvoir corriger
    :return: tuple, les valeurs de x et de y
    """
    x_array = []
    for i in range(len(message)):
        x_array.append(i+1)

    y_array = message
    for i in range(t):
        y_array.append(poly_lagrange(x_array, y_array, len(x_array)+i+1))

    for i in range(t):
        x_array.append(len(x_array)+1)
    return x_array, y_array

x_array, y_array = encode_message(message, 2)

print("x_array:", x_array)
print("y_array:", y_array)

x_array: [1, 2, 3, 4, 5, 6, 7]
y_array: [6, 9, 4, 2, 0, -19.0, -86.0]


In [531]:
# Ensuite on peut retirer 1 ou 2 valeurs du message pour simuler des erreurs
message_erreur = y_array.copy()
message_erreur[0] = 0
message_erreur[2] = 0

print("message_erreur:", message_erreur)

# On doit localiser les erreurs (pour les retirer des valeurs de x, y pour recalculer le polynome de lagrange)
def localiser_erreurs(x_array: np.ndarray, y_array: np.ndarray, message: np.ndarray, t: int) -> np.ndarray:
    """
    Cette fonction localise les erreurs dans le message
    :param x_array: np.ndarray, les valeurs de x
    :param y_array: np.ndarray, les valeurs de y
    :param message: np.ndarray, le message reçu
    :param t: int, le nombre d'erreurs à corriger
    :return: np.ndarray, les indices des erreurs
    """
    erreurs = []
    for i in range(len(message)):
        if message[i] != poly_lagrange(x_array, y_array, i+1):
            erreurs.append(i)
    return erreurs

erreurs = localiser_erreurs(x_array, y_array, message_erreur, t)
print("erreurs localisées:", erreurs)

# On corrige les erreurs en recalculant les valeurs des erreurs
def corriger_erreurs(x_array: np.ndarray, y_array: np.ndarray, message: np.ndarray, erreurs: np.ndarray) -> np.ndarray:
    """
    Cette fonction corrige les erreurs dans le message
    :param x_array: np.ndarray, les valeurs de x
    :param y_array: np.ndarray, les valeurs de y
    :param message: np.ndarray, le message reçu
    :param erreurs: np.ndarray, les indices des erreurs
    :return: np.ndarray, le message corrigé
    """
    for erreur in erreurs:
        message[erreur] = poly_lagrange(x_array, y_array, erreur+1)
    return message

message_corrige = corriger_erreurs(x_array, y_array, message_erreur, erreurs)
print("message_corrigé:", message_corrige)

message_erreur: [0, 9, 0, 2, 0, -19.0, -86.0]
erreurs localisées: [0, 2]
message_corrigé: [6.0, 9, 4.0, 2, 0, -19.0, -86.0]


# Reed-Salomon

Le principale problème de l'interpolation polynomiale avec les messages codés c'est que les y des points de redondances peuvent être bien trop grand, par exemple si on veux encoder un message avec que 1 octet par point, y doit etre compris entre 0 et 255, mais un point de redondance peut être bien plus grand que 255. Pour résoudre ce problème, on utilise le code de Reed-Salomon, qui limite la taille des y a l'aide de modulo.

Video explicative : https://www.youtube.com/watch?v=1pQJkt7-R4Q

### Initialisation de la table de Galois

Implémenté à partir de : https://en.wikiversity.org/wiki/Reed%E2%80%93Solomon_codes_for_coders#Multiplication_with_logarithms, et d'autres sources.

In [532]:
# Paramètres globaux
field_size = 256 # Taille du corps fini GF(2^8), un octet
primitive_polynomial = 0x11d  # Polynom primitif pour GF(2^8)

# Création des tables de logarithmes et exponentielles pour les opérations dans GF(2^8)
def create_gf_tables() -> tuple:
    """
    Cette fonction crée les tables de logarithmes et exponentielles pour les opérations dans GF(2^8)
    :return: tuple, les tables de logarithmes et exponentielles
    """
    exp = np.zeros(field_size * 2, dtype=int)
    log = np.zeros(field_size, dtype=int)
    x = 1
    for i in range(field_size - 1):
        exp[i] = x
        log[x] = i
        x <<= 1
        if x & field_size:
            x ^= primitive_polynomial
        exp[i + field_size - 1] = exp[i]
    return exp, log

exp_table, log_table = create_gf_tables()

### Operations sur GF(field_size)

Implémenté à partir de : https://en.wikiversity.org/wiki/Reed%E2%80%93Solomon_codes_for_coders#Finite_field_arithmetic, et d'autres sources.

In [533]:
# Opérations sur GF(2^8)
def gf_add(x: int, y: int) -> int:
    """
    Cette fonction réalise l'addition dans GF(2^8), qui est l'opération XOR
    :param x: int, le premier nombre
    :param y: int, le deuxième nombre
    :return: int, le résultat de l'addition
    """
    return x ^ y

def gf_sub(x: int, y: int) -> int:
    """
    Cette fonction réalise la soustraction dans GF(2^8), qui est aussi juste l'opération XOR
    :param x: int, le premier nombre
    :param y: int, le deuxième nombre
    :return: int, le résultat de la soustraction
    """
    return x ^ y

def gf_mult(x: int, y: int) -> int:
    """
    Cette fonction réalise la multiplication dans GF(2^8), en utilisant les tables de logarithmes et exponentielles
    :param x: int, le premier nombre
    :param y: int, le deuxième nombre
    :return: int, le résultat de la multiplication
    """
    if x == 0 or y == 0:
        return 0
    return exp_table[log_table[x] + log_table[y]]

def gf_div(x: int, y: int) -> int:
    """
    Cette fonction réalise la division dans GF(2^8), en utilisant les tables de logarithmes et exponentielles
    :param x: int, le premier nombre
    :param y: int, le deuxième nombre
    :return: int, le résultat de la division
    """
    if y == 0:
        raise ZeroDivisionError()
    if x == 0:
        return 0
    return exp_table[log_table[x] - log_table[y] + field_size - 1]

def gf_pow(x: int, power: int) -> int:
    """
    Cette fonction réalise la puissance dans GF(2^8), en utilisant les tables de logarithmes et exponentielles
    :param x: int, le nombre
    :param power: int, la puissance
    :return: int, le résultat de la puissance
    """
    return exp_table[(log_table[x] * power) % (field_size - 1)]

def gf_inverse(x: int) -> int:
    """
    Cette fonction calcule l'inverse dans GF(2^8), en utilisant les tables de logarithmes et exponentielles
    :param x: int, le nombre
    :return: int, l'inverse du nombre
    """
    return exp_table[field_size - 1 - log_table[x]]


In [534]:
# Tests pour les opérations sur GF(2^8) 
def test_gf_operations():
    """
    Cette fonction teste les opérations sur GF(2^8)
    Validé avec https://hp.vector.co.jp/authors/VA021385/galois_calc.htm
    """
    # Addition et soustraction
    assert gf_add(15, 27) == 15 ^ 27
    assert gf_sub(15, 27) == 15 ^ 27
    
    # Multiplication
    assert gf_mult(15, 27) == 153
    assert gf_mult(0, 15) == 0
    assert gf_mult(15, 0) == 0
    
    # Division
    assert gf_div(188, 15) == 216
    assert gf_div(188, 1) == 188
    
    # Puissance
    assert gf_pow(15, 1) == 15
    assert gf_pow(15, 2) == gf_mult(15, 15)
    
    # Inversion
    assert gf_inverse(15) == 150

    print("Tous les tests passent")

test_gf_operations()


Tous les tests passent


### Polynôme de Reed-Salomon

In [535]:
def polynome_base_lagrange_rs(x_array: np.ndarray, y_array: np.ndarray, var_x: int, i: int) -> int:
    """
    Cette fonction calcule le polynome de base de lagrange, c'est à dire le produit des termes (x-x_j)/(x_i-x_j) pour j != i
    :param x_array: np.ndarray, les valeurs de x
    :param y_array: np.ndarray, les valeurs de y
    :param var_x: int, la valeur de x pour laquelle on veut calculer le polynome de base
    :param i: int, l'indice du polynome de base
    :return: int, le polynome de base de lagrange
    """
    res = 1
    for j in range(len(x_array)):
        if j != i:
            res = gf_mult(res, gf_div(var_x-x_array[j], x_array[i]-x_array[j]))
    return gf_mult(y_array[i], res)

def poly_lagrange_rs(x_array: np.ndarray, y_array: np.ndarray, var_x: int) -> int:
    """
    Cette fonction calcule le polynome de lagrange pour un x donné, c'est à dire la somme des polynomes de base de lagrange
    :param x_array: np.ndarray, les valeurs de x
    :param y_array: np.ndarray, les valeurs de y
    :param var_x: int, la valeur de x pour laquelle on veut calculer le polynome de lagrange
    """
    sum = 0
    for i in range(len(x_array)):
        sum = gf_add(sum, polynome_base_lagrange_rs(x_array, y_array, var_x, i))
    return sum

In [536]:
# On défnit le message à envoyer
message = [6, 9, 4, 2, 0]

# On encode le message
def encode_message_rs(message: list, t: int) -> tuple:
    """
    Cette fonction encode un message en rajoutant de la redondance pour pouvoir corriger des erreurs
    :param message: list, le message à encoder
    :param t: int, le nombre d'erreurs que l'on veut pouvoir corriger
    :return: tuple, les valeurs de x et de y
    """
    x_array = []
    for i in range(len(message)):
        x_array.append(i+1)

    y_array = message
    for i in range(t):
        y_array.append(poly_lagrange_rs(x_array, y_array, len(x_array)+i+1))

    for i in range(t):
        x_array.append(len(x_array)+1)
    return x_array, y_array

x_array, y_array = encode_message_rs(message, 4)

print("x_array:", x_array)
print("y_array:", y_array)

# Ensuite on peut retirer 1, 2, 3 ou 4 valeurs du message pour simuler des erreurs
message_erreur = y_array.copy()
message_erreur[0] = 0
message_erreur[2] = 0
message_erreur[6] = 0
message_erreur[8] = 0

print("message_erreur:", message_erreur)

# On doit localiser les erreurs (pour les retirer des valeurs de x, y pour recalculer le polynome de lagrange)
def localiser_erreurs_rs(x_array: np.ndarray, y_array: np.ndarray, message: np.ndarray, t: int) -> np.ndarray:
    """
    Cette fonction localise les erreurs dans le message
    :param x_array: np.ndarray, les valeurs de x
    :param y_array: np.ndarray, les valeurs de y
    :param message: np.ndarray, le message reçu
    :param t: int, le nombre d'erreurs à corriger
    :return: np.ndarray, les indices des erreurs
    """
    erreurs = []
    for i in range(len(message)):
        if message[i] != poly_lagrange_rs(x_array, y_array, i+1):
            erreurs.append(i)
    return erreurs

erreurs = localiser_erreurs_rs(x_array, y_array, message_erreur, 4)
print("erreurs localisées:", erreurs)

# On corrige les erreurs en recalculant les valeurs des erreurs
def corriger_erreurs_rs(x_array: np.ndarray, y_array: np.ndarray, message: np.ndarray, erreurs: np.ndarray) -> np.ndarray:
    """
    Cette fonction corrige les erreurs dans le message
    :param x_array: np.ndarray, les valeurs de x
    :param y_array: np.ndarray, les valeurs de y
    :param message: np.ndarray, le message reçu
    :param erreurs: np.ndarray, les indices des erreurs
    :return: np.ndarray, le message corrigé
    """
    for erreur in erreurs:
        message[erreur] = poly_lagrange_rs(x_array, y_array, erreur+1)
    return message

message_corrige = corriger_erreurs_rs(x_array, y_array, message_erreur, erreurs)
print("message_corrigé:", message_corrige)

x_array: [1, 2, 3, 4, 5, 6, 7, 8, 9]
y_array: [6, 9, 4, 2, 0, 189, 231, 139, 163]
message_erreur: [0, 9, 0, 2, 0, 189, 0, 139, 0]
erreurs localisées: [0, 2, 6, 8]
message_corrigé: [6, 9, 4, 2, 0, 189, 231, 139, 163]


# Application dans les QR codes et autres en correction d'erreur