# Imports

In [None]:
import numpy as np
from PIL import Image
import os
from scipy.io import loadmat,savemat
import tensorflow as tf
from tensorflow import keras
from keras.layers import *
from keras import models
from keras import regularizers
from keras.callbacks import EarlyStopping,ModelCheckpoint,ReduceLROnPlateau
import matplotlib.pyplot as plt
from math import pi

# Fonctions utiles

In [None]:
# Cette fonction permettra plus tard de charger plus ou moins d'images (en modifiant le paramètre num_images)
# et de modifier la dimension d'entrée
def load_data(image_size, num_images,path):
  dirs = sorted(os.listdir(path))

  x = np.zeros((min(num_images,len(dirs)),image_size,image_size))
  y = np.zeros((min(num_images,len(dirs)), 3))
    
  #Chargement des normals    
  mat_contents = loadmat(path+'/normals.mat')
  normals = mat_contents['normals']
  normals =tf.squeeze(normals)
  #print(normals)

  # Chargement des images, qui sont rangées dans lsp/images
  for i in range(min(num_images,len(dirs))):
    item = dirs[i]
    #print(item)
    if os.path.isfile(path+item):
      img = Image.open(path+item)
      # Redimensionnement et sauvegarde des normals
      y[i][0] = normals[i][0]
      y[i][1] = normals[i][1]
      y[i][2] = normals[i][2]
      # Redimensionnement et sauvegarde des images        
      # img = img.resize((image_size,image_size))
      x[i] = np.asarray(img)/255
    
  return x, y

def plot_training_analysis(history):
  acc = history.history['mae']
  val_acc = history.history['val_mae']
  loss = history.history['loss']
  val_loss = history.history['val_loss']

  epochs = range(len(loss))
  
  plt.plot(epochs, acc, 'b', linestyle="--",label='Training mae')
  plt.plot(epochs, val_acc, 'g', label='Validation mae')
  plt.title('Training and validation mae')
  plt.legend()

  plt.figure()

  plt.plot(epochs, loss, 'b', linestyle="--",label='Training loss')
  plt.plot(epochs, val_loss,'g', label='Validation loss')
  plt.title('Training and validation loss')
  plt.legend()

  plt.show()

In [None]:
def unit_vector(vector):
    """ Returns the unit vector of the vector.  """
    return vector / np.linalg.norm(vector)

def angle_between(v1, v2):
    """ Returns the angle in radians between vectors 'v1' and 'v2'::

            >>> angle_between((1, 0, 0), (0, 1, 0))
            1.5707963267948966
            >>> angle_between((1, 0, 0), (1, 0, 0))
            0.0
            >>> angle_between((1, 0, 0), (-1, 0, 0))
            3.141592653589793
    """
    v1_u = unit_vector(v1)
    v2_u = unit_vector(v2)
    return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))

def angle_between2(v1, v2):
    v2 = np.array([v2[0],v2[1],v1[2]]) # On rajoute la 3ème valeur (n_z)
    v1_u = unit_vector(v1)
    v2_u = unit_vector(v2)
    return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))

def angle_error(y_true,y_pred):
    angles = np.array(list(map(angle_between,y_true,y_pred)))
    error = np.mean(angles)
    return error

def angle_error2(y_true,y_pred):
    angles = np.array(list(map(angle_between2,y_true,y_pred)))
    error = np.mean(angles)
    return error


## imagettes 3x3

On charge les données du peaks normal (qui vont devenir nos données de test)  
x = imagettes, y = normales

In [23]:
mat_contents = loadmat('Data/peaks_normal3.mat')
x_test3 = mat_contents['imagettes']/255
y_test3 = mat_contents['normals']
print(x_test3.shape,y_test3.shape)

(2304, 3, 3) (2304, 3)


On charge les données des peaks un peu modifiés

In [27]:
mat_contents = loadmat('Data/peaks_modified3.mat')
x3 = mat_contents['imagettes']/255
y3 = mat_contents['normals']
print(x3.shape,y3.shape)

(11520, 3, 3) (11520, 3)


On sépare les données des peaks modifiés en 2 groupes (données de training et données de validation)

In [None]:
from sklearn.model_selection import train_test_split
x_train3,x_val3,y_train3,y_val3=train_test_split(x3,y3,test_size=0.1,random_state=123)

On affiche certaines imagettes ainsi que leurs normales

In [None]:
fig, axs = plt.subplots(5,5)
fig.set_figwidth(15)
fig.set_figheight(15)
for i in range(5):
    for j in range(5):
        axs[i,j].axis("off")
        axs[i,j].set_title(np.round(y_train3[i*5+j],2))
        axs[i,j].imshow(x_train3[i*5+j],cmap='gray',vmin=0,vmax=1)

# Création et entraînement du réseau

In [None]:
# Note : les boucles for ne sont plus utiles, on n'utilise pas la régularisation (val_l1 et val_l2 = 0) car les résultats étaient moins bons en utilisant la régularisation

val_l1= [0]
val_l2 = [0]
historys = [0 for i in range(len(val_l2)*len(val_l1))]
for (i,l2) in enumerate(val_l2) :
    for (j,l1) in enumerate(val_l1):
        print("Entraînement n°"+str(len(val_l1)*i+j+1)+"/"+str(len(val_l2)*len(val_l1)))

        regularizer = regularizers.l1_l2(l1=l1,l2=l2)

        # Création du réseau
        model9 = models.Sequential()
        model9.add(Flatten())
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(32,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(16,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(8,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(3,activation='linear'))

        # On utilise Adam, un optimiseur simple
        opt = keras.optimizers.Adam(learning_rate=3e-3) 

        # On utilise une loss mean square error que l'on va chercher à minimiser, c'est grâce à cette loss que les poids vont être mis à jour à chaque batch
        # On utilise comme métrique que l'on peut observer au cour de l'entraînement la mean absolute error
        model9.compile(loss='mse',
                    optimizer=opt,
                    metrics='mae')

        mcp_save = ModelCheckpoint('model_weights/meilleur_modele3.hdf5', save_best_only=True, monitor='val_mae', mode='min')

        historys[len(val_l1)*i+j] = model9.fit(x_train3, y_train3,
                epochs=100,
                validation_data = (x_val3,y_val3),
                batch_size=512,verbose=1,
                callbacks=[mcp_save])

In [None]:
plot_training_analysis(historys[0])

On teste alors sur les données du peaks normal, que le réseau n'a jamais vu

In [None]:
model9 = models.load_model('model_weights/meilleur_modele3.hdf5')
model9.evaluate(x_test3,y_test3)

On peut également calculer l'erreur angulaire moyenne avec ces données de test

In [None]:
y_pred3=model9.predict(x_test3)
angle_error(y_test3,y_pred3)*180/pi

Ou bien calculer tout cela sur les données d'entraînement

In [None]:
model9.evaluate(x_train3,y_train3)

In [None]:
y_pred3=model9.predict(x_train3)
angle_error(y_train3,y_pred3)*180/pi

# La suite est identique mais pour des imagettes 9x9 et 13x13

# imagettes 9x9

In [None]:
path_9x9_peaks = "Data/train_data/imagettes_9x9_100/"
sz_9x9=9
xp, yp = load_data(sz_9x9,8464,path_9x9_peaks)

In [None]:
# model9 = models.load_model('.mdl9_wts015_new.hdf5')
model9 = models.load_model('.mdl9_wts.hdf5')
model9.evaluate(xp,yp)

In [None]:
yp_pred = model9.predict(xp)
angle_error2(yp,yp_pred)

In [None]:
# path_9x9 = "Data/train_data/imagettes_9x9_100/"
path_9x9_new = "Data/train_data/imagettes_9x9_100_bis/"

In [None]:
# Chargement de seulement 10 images
sz_9x9=9
xx, yy = load_data(sz_9x9,1080,path_9x9_new)

In [None]:
# Normalisation des données
mean = xx.mean(axis=(0,1,2), keepdims=True)
std = xx.std(axis=(0,1,2), keepdims=True)
xx = (xx - mean)/std

In [None]:
mat_contents = loadmat('Data/peaks_normal9_100.mat')
xp = mat_contents['imagettes']/255
yp = mat_contents['normals']

In [None]:
model9 = models.load_model(".mdl9_wts.hdf5")
model9.evaluate(xp,yp)

In [None]:
from math import pi

In [None]:
yp_pred = model9.predict(xp)
angle_error2(yp,yp_pred)*180/pi

In [None]:
mat_contents = loadmat('Data/peaks_modified9.mat')
xx = mat_contents['imagettes']/255
yy = mat_contents['normals']

In [None]:
from sklearn.model_selection import train_test_split
xx_train,xx_test,yy_train,yy_test=train_test_split(xx,yy,test_size=0.1,random_state=123)

In [None]:
fig, axs = plt.subplots(5,5)
fig.set_figwidth(15)
fig.set_figheight(15)
for i in range(5):
    for j in range(5):
        axs[i,j].axis("off")
        axs[i,j].set_title(np.round(yy_train[i*5+j],2))
        axs[i,j].imshow(xx_train[i*5+j],cmap='gray',vmin=0,vmax=1)

In [None]:
def my_linspace(start,num,step) :
    return np.arange(0,num)*step+start

In [None]:
val_l1= [0]
val_l2 = [0]
historys = [0 for i in range(len(val_l2)*len(val_l1))]
for (i,l2) in enumerate(val_l2) :
    for (j,l1) in enumerate(val_l1):
        print("Entraînement n°"+str(len(val_l1)*i+j+1)+"/"+str(len(val_l2)*len(val_l1)))

        regularizer = regularizers.l1_l2(l1=l1,l2=l2)

        model9 = models.Sequential()
        model9.add(Flatten())
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(64,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(32,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(16,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(8,activation='relu',kernel_regularizer=regularizer))
        model9.add(Dense(3,activation='linear'))
        # model9 = models.load_model('.mdl9_wts.hdf5')

        # On utilise Adam, un optimiseur simple, et l'on minimise une entropie croisée binaire
        opt = keras.optimizers.Adam(learning_rate=3e-3) 
        #Loss= tf.keras.losses.mean_squared_error(y_true, y_pred)

        model9.compile(loss='mse',
                    optimizer=opt,
                    metrics='mae')

        earlyStopping = EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='min')
        mcp_save = ModelCheckpoint('.mdl9_wts.hdf5', save_best_only=True, monitor='val_mae', mode='min')
        # reduce_lr_loss = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=7, verbose=1, epsilon=1e-4, mode='min')

        historys[len(val_l1)*i+j] = model9.fit(xx_train, yy_train,
                epochs=500,
                validation_data = (xx_test,yy_test),
                batch_size=512,verbose=1,
                callbacks=[mcp_save])

In [None]:
plot_training_analysis(historys[0])

In [None]:
model9 = models.load_model('.mdl9_wts.hdf5')
model9.evaluate(xp,yp)

In [None]:
for i in range(len(historys)):
    l1 = val_l1[i%len(val_l1)]
    l2 = val_l2[i//len(val_l1)]
    history = historys[i]
    val_mae = history.history['val_mae']
    print("val MAE min : "+str(round(min(val_mae),2))+" || l2="+str(l2)+" l1="+str(l1))

In [None]:
yy_pred = model9.predict(xp)
angle_error(yp,yy_pred)*180/pi

In [None]:
# mae min : 0.15
# angle_error : 0.33

In [None]:
def scatter_data(x_val,y_val,model): 
  """
  plot le nuage de points 
  input : x_val :  imagettes de validation 
      y_val: noarmals associés
      model: le model utilisé
  output  : le plot de predictions=f(y_val)

  """
  test_predictions = model.predict(x_val)
  test_labels = y_val
  fig, ax = plt.subplots(figsize=(8,4))
  plt.scatter(test_labels, test_predictions, alpha=0.6, 
              color='#FF0000', lw=1, ec='black')
  lims = [0, 1]

  plt.plot(lims, lims, lw=1, color='#0000FF')
  plt.axis("off")
  plt.ticklabel_format(useOffset=False, style='plain')
  plt.xticks(fontsize=18)
  plt.yticks(fontsize=18)
  plt.xlim(lims)
  plt.ylim(lims)

  plt.tight_layout()
  plt.show()

scatter_data(xp,yp,model9)

In [None]:
model9.evaluate(xp,yp)

In [None]:
model9 = models.load_model('.mdl9_wts.hdf5')
yp_pred = model9.predict(xp)
angle_error(yp,yp_pred)

In [None]:
yp_pred_rs = np.reshape(yp_pred,(42,42,3))
yp_rs = np.reshape(yp,(42,42,3))

In [None]:
y_dict = {"normals_peaks9":yp_pred_rs,"normals_true9":yp_rs}

In [None]:
savemat("normals_peaks9.mat",y_dict)

# imagettes 13x13

In [None]:
path_13x13 = "Data/train_data/imagettes_13x13/"

In [None]:
sz_13x13=13
x13, y13 = load_data(sz_13x13,1444,path_13x13)

In [None]:
from sklearn.model_selection import train_test_split
x13_train,x13_test,y13_train,y13_test=train_test_split(x13,y13,test_size=0.2,random_state=123)

In [None]:
def my_linspace(start,num,step) :
    return np.arange(0,num)*step+start

In [None]:
l1=0
l2=0

model13 = models.Sequential()
model13.add(Conv1D(128,3,activation='relu'))
model13.add(Conv1D(32,3,activation='relu'))
model13.add(Conv1D(16,3,activation='relu'))
model13.add(Flatten())
model13.add(Dense(16,activation='relu',kernel_regularizer=regularizer))
model13.add(Dense(8,activation='relu',kernel_regularizer=regularizer))
model13.add(Dense(3,activation='linear'))

# On utilise Adam, un optimiseur simple, et l'on minimise une entropie croisée binaire
opt = keras.optimizers.Adam(learning_rate=3e-3) 

model13.compile(loss='mse',
              optimizer=opt,
              metrics='mae')

earlyStopping = EarlyStopping(monitor='val_loss', patience=10, verbose=0, mode='min')
mcp_save = ModelCheckpoint('.mdl13_wts.hdf5', save_best_only=True, monitor='val_mae', mode='min')
reduce_lr_loss = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=7, verbose=1, epsilon=1e-4, mode='min')

history = model13.fit(x13_train, y13_train,
          epochs=1000,
          validation_data = (x13_test, y13_test),
          batch_size=50,verbose=1,
          callbacks=[mcp_save])

In [None]:
plot_training_analysis(history)

In [None]:
model13 = models.load_model('.mdl13_wts.hdf5')
model13.evaluate(x13_test,y13_test)

In [None]:
y13_pred = model13.predict(x13_test)
angle_error(y13_test,y13_pred)

In [None]:
# mae min : 0.16
# angle_error : 0.34