In [1]:
import scipy.io 
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, normalize
from sklearn.utils import shuffle
import csv 
from sklearn.model_selection import train_test_split

**Lecture des données**

In [2]:
mat = scipy.io.loadmat("data_doc.mat")
X = mat['Xts'].T.toarray()
df_y = pd.DataFrame(mat['yts'])
y = pd.get_dummies(df_y[0]).to_numpy() # OneHotEncoding
X_to_predict = mat['Xvr'].T.toarray()

In [3]:
"""
# Normalisation entre 0 et 1 
X = normalize(X)
X_to_predict = normalize(X_to_predict)"""

'\n# Normalisation entre 0 et 1 \nX = normalize(X)\nX_to_predict = normalize(X_to_predict)'

In [4]:
# Normalisation autour de 0 moyenne nulle et même variance
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_to_predict = scaler.transform(X_to_predict)

In [5]:
# Split data en entrainement et validation
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.1, random_state=42)

In [6]:
class Net:
    def __init__(self, mots, classe, method=None):
        """Initialisation des poids et des biais """
        if method is None:
            # init uniform
            self.weights = np.random.randn(mots, classe)
            self.biases = np.random.randn(classe)
        elif method == "xavier":
            F_in = mots
            F_out = classe 
            limit = np.sqrt(6 / float(F_in + F_out))
            self.weights = np.random.uniform(low=-limit, high=limit, size=(F_in, F_out))
            self.biases = np.random.randn(classe)
            
       
    def forward(self, x):
        """Forward pass pour calculer la sortie du réseau"""
        # sortie z = W^t*X+B
        z = np.dot(self.weights.T, x) + self.biases
        # fct d'activation = sigmoïde
        activation = sigmoid(z)
        return z, activation


    def evaluate_gradient(self, x, y):
        """Calcul des dérivées partielles de la loss pour un itéré x"""
        # on récupère la valeur de z et act(z) 
        z, activation = self.forward(x)
        # calculer les dérivées partielles pour la couche de sortie par rapport à W et B
        delta_b = (activation-y)*sigmoid_prime(z)
        delta_w = np.outer(x, delta_b)
        return delta_b, delta_w


    
    def compute_gradient(self, X, y, lr):
        """Calcul le gradient pour un batch"""
        
        # stocker la somme des gradients des x du batch
        somme_delta_b = np.zeros(self.biases.shape)
        somme_delta_w = np.zeros(self.weights.shape)

        for x,y in zip(X,y):
            delta_b, delta_w = self.evaluate_gradient(x, y)
            somme_delta_b+=delta_b
            somme_delta_w+=delta_w
        
        return somme_delta_b, somme_delta_w


                                                                                                                                  
    def SGD(self, X, y, lr, epochs):
        """SGD batchsize=1"""
        # shuffle les données
        X, y = shuffle(X,y)
        for epoch in range(epochs):
            # à chaque époque on parcourt tous les X, on calcule le gradient et on met à jour les poids et le biais pour chaque X
            for i in range(len(X)):
                somme_delta_b, somme_delta_w =  self.compute_gradient(X[i], y[i], lr)
                # update les poids      
                self.weights -= (lr/X[i].shape[0])*somme_delta_w 
                self.biases -= (lr/X[i].shape[0])*somme_delta_b

            # calcul de l'accuracy et de la loss pour cette époque 
            accuracy, loss = self.evaluate_model(X, y)
            print(f'Epoch {epoch}')
            print(f'Accuracy = {accuracy}')
            print(f'Loss = {loss}')

    def GD(self, X, y, lr, epochs):
        """Gradient descent"""
        X, y = shuffle(X,y)
        for epoch in range(epochs):
            # à chaque époque, on met le gradient une seule fosi à jour
            somme_delta_b, somme_delta_w =  self.compute_gradient(X, y, lr)
            # update les poids      
            self.weights -= (lr/X.shape[0])*somme_delta_w 
            self.biases -= (lr/X.shape[0])*somme_delta_b

            # calcul de l'accuracy et de la loss pour cette époque 
            accuracy, loss = self.evaluate_model(X, y)
            print(f'Epoch {epoch}')
            print(f'Accuracy = {accuracy}')
            print(f'Loss = {loss}')

    
    def SGD_batch(self, X, y, lr, epochs, batch_size):
        """SGD+mini-batch"""
        # shuffle les données
        X, y = shuffle(X,y)
        losses = [10**8]
        for epoch in range(epochs):
            # à chaque époque, on met à jour W et B avec mini-batchs
            num_samples = y.shape[0]
            ret = num_samples % batch_size
            # si taille des données pas divisable par batch size alors resize les données
            if ret != 0:
                X_resize = X[:len(X)-ret]
                y_resize = y[:len(y)-ret]

            iterations = int(num_samples / batch_size)
            print(f"Nombre d'itérations par époque = {iterations}")
            for i in range(iterations):
                # pour chaque mini-batch on veut entrainer le modèle et mettre à jour les poids
                start = i * batch_size
                end = start+batch_size
                X_batch = X_resize[start:end, :]
                y_batch = y_resize[start:end, :]
                somme_delta_b, somme_delta_w =  self.compute_gradient(X_batch, y_batch, lr)
                # update les poids      
                self.weights -= (lr/X_batch.shape[0])*somme_delta_w 
                self.biases -= (lr/X_batch.shape[0])*somme_delta_b

            # calcul de l'accuracy et de la loss pour cette époque 
            accuracy, loss = self.evaluate_model(X, y)
            print(f'Epoch {epoch}')
            print(f'Accuracy = {accuracy}')
            print(f'Loss = {loss}')

            # update le learning rate si on a une meilleure loss alors lr/2
            if loss>losses[-1]:
                lr/=2
            losses.append(loss)

    def Adam(self, X, y, lr, epochs, batch_size, X_val, y_val):
        # init les paramètres pour Adam
        beta_1 = 0.9
        beta_2 = 0.999					
        epsilon = 1e-8
        # init le vecteur
        t = 1						
        m_dw = m_db = v_dw = v_db = 0 
        X, y = shuffle(X,y)
        losses = [10**8]
        val_losses = [10**8]
        # flag utilisé pour early stopping si val loss >>>
        flag = 0    
        for epoch in range(epochs):
            # à chaque époque, on met à jour W et B avec mini-batchs
            num_samples = y.shape[0]
            ret = num_samples % batch_size
            # si taille des données pas divisable par batch size alors resize les données
            if ret != 0:
                X_resize = X[:len(X)-ret]
                y_resize = y[:len(y)-ret]

            iterations = int(num_samples / batch_size)
            print(f"Nombre d'itérations par époque = {iterations}")
            for i in range(iterations):
                # pour chaque mini-batch on veut entrainer le modèle et mettre à jour les poids
                start = i * batch_size
                end = start+batch_size
                X_batch = X_resize[start:end, :]
                y_batch = y_resize[start:end, :]
                grad_b, grad_w =  self.compute_gradient(X_batch, y_batch, lr)
                # adam update
                # momentum poids
                m_dw = beta_1*m_dw + (1-beta_1)*grad_w
                # momentum biais
                m_db = beta_1*m_db + (1-beta_1)*grad_b

                # poids
                v_dw = beta_2*v_dw + (1-beta_2)*(grad_w**2)
                # biais
                v_db = beta_2*v_db + (1-beta_2)*(grad_b**2)

                # correction
                m_dw_corr = m_dw/(1-beta_1**t)
                m_db_corr = m_db/(1-beta_1**t)
                v_dw_corr = v_dw/(1-beta_2**t)
                v_db_corr = v_db/(1-beta_2**t)
                # update les poids      
                self.weights -= (lr/X.shape[0])*(m_dw_corr/(np.sqrt(v_dw_corr)+epsilon))
                self.biases -= (lr/X.shape[0])*(m_db_corr/(np.sqrt(v_db_corr)+epsilon))

            # calcul de l'accuracy et de la loss pour cette époque 
            accuracy, loss = self.evaluate_model(X, y)
            print(f'Epoch {epoch}')
            print(f'Accuracy = {accuracy}')
            print(f'Loss = {loss}')
            if loss > losses[-1]:
                lr /= 2
            losses.append(loss)

            val_accuracy, val_loss = self.evaluate_model(X_val, y_val)
            print(f'Val accuracy = {val_accuracy}')
            print(f'Val loss = {val_loss}')
            if val_loss > val_losses[-1]:
                flag+=1
            else:
                flag=0
            if flag>5:
                print('EARLY STOPPING')
                exit()
            val_losses.append(val_loss)


    def predict(self, X):
        """Prediction sur des nouvelles données"""
        preds = []
        for x in X:
            z, act = self.forward(x)
            preds.append(act)
        return np.array(preds)
    
    def evaluate_model(self, X, Y):
        """Calcule l'accuracy du modèle
        X = vecteur qui contient des X d'entrées
        Y = vecteur de vraie valeurs
        """
        preds = self.predict(X)
        loss = (1/(2*Y.shape[0])) * np.linalg.norm(preds - Y, 'fro') ** 2
        predicted = np.array([pred.argmax() for pred in preds])
        true_y = np.array([y.argmax() for y in Y])
        # si predicted == la vraie valeur (test_y) alors True sinon False 
        correct = predicted == true_y
        accuracy = (correct.sum() / len(correct))
        return accuracy, loss

    def write_output(self, X):
        preds = self.predict(X)
        predicted = np.array([np.argmax(pred) for pred in preds])
        with open('output.csv' ,'w', newline='') as fout:
            writer = csv.writer(fout)
            writer.writerow(["id", "class"])
            for i,pred in enumerate(predicted, start=1):
                writer.writerow([i, (pred+1)*100+1])

        
def sigmoid(z):
    """Fonction d'activation sigmoïde"""
    return 1 / (1 + np.exp(-z))
def sigmoid_prime(z):
    """Dérivée de la sigmoïde"""
    return sigmoid(z) * (1 - sigmoid(z))

In [7]:
net = Net(X.shape[1], y.shape[1], 'xavier')

In [8]:
net.Adam(X_train, y_train, 0.1, 1000, 64, X_val, y_val)

Nombre d'itérations par époque = 196
Epoch 0
Accuracy = 0.06478828398599172
Loss = 2.5424691081387976
Val accuracy = 0.0651862464183381
Val loss = 2.683601492496913
Nombre d'itérations par époque = 196
Epoch 1
Accuracy = 0.07529449219993632
Loss = 2.4203712320972586
Val accuracy = 0.07521489971346705
Val loss = 2.648695928303727
Nombre d'itérations par époque = 196
Epoch 2
Accuracy = 0.08373129576567973
Loss = 2.326065447864545
Val accuracy = 0.08237822349570201
Val loss = 2.621720065909946
Nombre d'itérations par époque = 196
Epoch 3
Accuracy = 0.09137217446673034
Loss = 2.245741361750682
Val accuracy = 0.09097421203438395
Val loss = 2.598579070389861
Nombre d'itérations par époque = 196
Epoch 4
Accuracy = 0.09893346068131169
Loss = 2.1742163150113702
Val accuracy = 0.09742120343839542
Val loss = 2.578026168204555
Nombre d'itérations par époque = 196
Epoch 5
Accuracy = 0.10776822667940146
Loss = 2.1088944687796705
Val accuracy = 0.1010028653295129
Val loss = 2.559375938006892
Nombre d

In [None]:
net.write_output(X_to_predict)