In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import sys
sys.version

'3.7.10 (default, May  3 2021, 02:48:31) \n[GCC 7.5.0]'

In [3]:
!cp '/content/drive/MyDrive/fiw-bs-kin_processed.zip' 'fiw-bs-kin.zip'

In [4]:
from zipfile import ZipFile

with ZipFile('fiw-bs-kin.zip') as zip:
    zip.extractall('Data')
    print('File is unzipped in temp folder')

File is unzipped in temp folder


In [5]:
"""
  Importer les bibliotheques nécessaires
  
"""

import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from imageio import imread
from PIL import Image
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense,Input, Conv2D, MaxPooling2D, Flatten, Lambda,Dropout,Concatenate
from tensorflow.keras.optimizers import Adam, SGD
import tensorflow.keras.backend as bk
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping,ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.metrics import RootMeanSquaredError
from keras.models import load_model 
import cv2
from datetime import datetime
from tensorflow.keras import regularizers
from random import randint
from time import time
from tensorflow.keras import layers
from keras.models import load_model 


In [6]:
def ReadFile(file_path):
    with open(file_path, 'rb') as bin_file:
        return bin_file.read()

def ChargerDonneesRelation(kinShip):
  train_setX,train_setY,test_setX,test_setY = [],[],[],[]
    
  train_path = "/content/Data/Dataset/bs-kin/train-faces/"
  test_path = "/content/Data/Dataset/bs-kin/test-faces/"
  
  dataTrain = pd.read_csv("Data/Dataset/bs-kin/train-"+kinShip+".txt" ,delimiter=" ")
  dataTrain = dataTrain.sample(frac=1).reset_index(drop=True)
  dataTrain.columns = ['parent', 'child', 'Kinship']
  dataTest = pd.read_csv("Data/Dataset/bs-kin/test-"+kinShip+".txt" ,delimiter=" ")
  dataTest = dataTest.sample(frac=1).reset_index(drop=True)
  dataTest.columns = ['parent', 'child', 'Kinship']

  for i in range(len(dataTrain)):
      pimg = ReadFile(train_path + dataTrain.parent[i])
      cimg = ReadFile(train_path + dataTrain.child[i])
      train_setX.append((pimg, cimg))
      train_setY.append(dataTrain.Kinship[i])
  
  for i in range(len(dataTest)):
      pimg = ReadFile(test_path + dataTest.parent[i])
      cimg = ReadFile(test_path + dataTest.child[i])
      test_setX.append((pimg, cimg))
      test_setY.append(dataTest.Kinship[i])

  return train_setX, train_setY, test_setX, test_setY

In [9]:

def L1_Shape(shape):
      return shape[0]

def build_CNN(input_shape):
  """
    Ici on cree notre sous-reseaux de neurones 
    
    Comme argument: 
      Input_shape: dimensionne d'une image  

  """
  
  ConvNet = Sequential(name='Feature_extraction')
  ConvNet.add(Conv2D(16, kernel_size=3, activation='relu', padding='same', 
                     input_shape=input_shape))
  ConvNet.add(MaxPooling2D())
  ConvNet.add(Conv2D(32, kernel_size=3, activation='relu', padding='same'))
  ConvNet.add(MaxPooling2D())
  ConvNet.add(Conv2D(64, kernel_size=3, activation='relu', padding='same'))
  ConvNet.add(MaxPooling2D())
  ConvNet.add(Flatten())

  print("======================Description du modele CNN======================")
  ConvNet.summary()

  return ConvNet
  

def build_SiameseConNet(input_shape):
  """
    Ici on cree notre reseaux de neurones 
    
    Comme argument: 
      Input_shape: dimensionne d'une image  

  """


  ConvNet = build_CNN(input_shape)
  #**********************Creation des entrees********************** 
  #Entree 1
  input_l = Input(shape=input_shape)
  #Entree 2
  input_r = Input(shape=input_shape)

  #recuperation des vecteurs de caracteristiques 
  #Feature-Vector 1 
  encoded_l = ConvNet(input_l)
  #Feature-Vector 2
  encoded_r = ConvNet(input_r)

  # Creation de la couche de fussionne 
  Merged = Lambda(lambda tensor: bk.abs(tensor[0] - tensor[1]), output_shape=L1_Shape)

  #Connecter la couche du fussionne au sortie des deux reseaux jumeaux
  couche_fussionne = Merged([encoded_l, encoded_r])

  #Couche de sortie 
  prediction = Dense(1, activation='sigmoid')(couche_fussionne)

  # Creation du modele siamois 
  SiameseNet = Model(name='Siamese_CNN',inputs=[input_l,input_r],outputs=prediction)
  print("====================== Description du modele ======================")
  SiameseNet.summary()

  return SiameseNet

In [10]:
def decode_jpeg(data, img_shape):
    """
      Cette fonction decode les images 
      args:
        data: le code de l'image
        img_shape: dimensionne des images  
    """
    src = cv2.imdecode(np.frombuffer(data, dtype=np.uint8), cv2.IMREAD_COLOR)
    dst = np.zeros(src.shape, np.float32)
    dst[:, :, 0] = src[:, :, 2]
    dst[:, :, 1] = src[:, :, 1]
    dst[:, :, 2] = src[:, :, 0]
    return (cv2.resize(dst, (img_shape[0], img_shape[1]))/255).astype(np.float32)

def Train(model, trainset_x, trainset_y, nb_epochs, size_batch, path, save_each_epoch=False):
    """
      Fonction d'entrainement du modele 
      args:
        model: le model a entraîner
        trainset_x: ensemble de donnes X 
        trainset_y:ensemble de donnes Y  
        nb_epochs: nombre d'epoch sur lequel on va entrainer 
        size_batch: taille de l'ensemble de donnnees a utiliser a chaque iteration sur le datasets
        path: l'emplacement ou on veux enregistre le modele
        save_each_epoch: est ce que on veux enregistre le modele pour chaque epochs 
    """
    resultats = []
    filepath = path+".h5"

    checkpoint = ModelCheckpoint(filepath,monitor='val_accuracy', 
                                verbose=1,save_best_only=True, 
                                mode='max')

    reduce_on_plateau = ReduceLROnPlateau(monitor="val_accuracy", mode="max", 
                                            factor=0.1,patience=2,verbose=1)

    checkpoint_ = ModelCheckpoint(filepath, monitor='val_accuracy',verbose=1, 
                                    save_best_only=False,save_weights_only=False, 
                                    mode='auto', save_freq = 'epoch')

    if save_each_epoch: 
        calls = [checkpoint_, reduce_on_plateau]
        
    else : 
        calls = [checkpoint, reduce_on_plateau]

    model.compile(optimizer=SGD(0.01), loss='binary_crossentropy', metrics=["accuracy"])
    img_shape = model.input_shape[0][1:] 
    #================= Creation du dataset de validation =================
    validation_split=0.2
    assert len(trainset_x) == len(trainset_y)
    data_size = len(trainset_x)
    valid_size = int(validation_split * data_size)
    train_size = data_size - valid_size
    train_x = trainset_x[:train_size]
    train_y = trainset_y[:train_size]
    valid_x = trainset_x[train_size:]
    valid_y = trainset_y[train_size:]

    #================= Calcule du nombre d'iteration par epoch =================
    nb_iterations_t = train_size // size_batch
    nb_iterations_v = valid_size // size_batch
    batch_x = [np.empty((size_batch, img_shape[0], img_shape[1], img_shape[2]), np.float32), np.empty((size_batch, img_shape[0], img_shape[1], img_shape[2]), np.float32)]
    batch_y = np.empty((size_batch, 1), np.float32)

    for call in calls:
        call.on_train_begin()
        call.model = model
    for epochs in range(nb_epochs):
        start = time()
        print('Epoch %d/%d'%(epochs+1, nb_epochs))
        loss = 0
        accu = 0
        for iterations in range(nb_iterations_t):
          # Creation du batch pour entrainement
          for i in range(size_batch):
              sample = randint(0, train_size-1)
              batch_x[0][i, :, :, :] = decode_jpeg(train_x[sample][0], img_shape)
              batch_x[1][i, :, :, :] = decode_jpeg(train_x[sample][1], img_shape)
              batch_y[i] = train_y[sample]
          # Entrainer sur le batch cree
          result = model.train_on_batch(batch_x, batch_y)
          loss = loss + result[0]
          accu = accu + result[1]
        # Calculer la performance obtenue avec ce batch
        loss = loss / nb_iterations_t
        accu = accu / nb_iterations_t
        val_loss = 0
        val_accu = 0
        for iterations in range(nb_iterations_v):
          #Creation du batch pour validation  
          for i in range(size_batch):
              sample = randint(0, valid_size-1)
              batch_x[0][i, :, :, :] = decode_jpeg(valid_x[sample][0], img_shape)
              batch_x[1][i, :, :, :] = decode_jpeg(valid_x[sample][1], img_shape)
              batch_y[i] = valid_y[sample]
          # Valider sur ce batch seulement
          result = model.test_on_batch(batch_x, batch_y)
          val_loss = val_loss + result[0]
          val_accu = val_accu + result[1]
        # Calculer la performance obtenue avec ce batch
        val_loss = val_loss / nb_iterations_v
        val_accu = val_accu / nb_iterations_v

        resultats.append({"epoch":epochs, "loss":loss,"accu":accu,"val_loss":val_loss,"val_accu":val_accu})
        print('%ds - loss: %.04f - accuracy: %.04f - val_loss: %.04f - val_accuracy: %.04f'%(int(time()-start), loss, accu, val_loss, val_accu))
        for call in calls:
            call.on_epoch_end(epochs+1, logs={'val_accuracy': val_accu})
    return resultats

In [12]:
def test(model, test_x, test_y,size_batch):
  """
    ici on test notre modele 
        test_x: ensemble de donnes X 
        test_y:ensemble de donnes Y  
        size_batch: taille de l'ensemble de donnnees a utiliser a chaque iteration sur le datasets

  """
  test_size = len(test_x)
  img_shape = model.input_shape[0][1:] 
  # Nombre d'iterations training
  nb_iterations_v = test_size // size_batch
  test_loss = 0
  test_accu = 0
  batch_x = [np.empty((size_batch, img_shape[0], img_shape[1], img_shape[2]),np.float32),
              np.empty((size_batch, img_shape[0], img_shape[1], img_shape[2]), np.float32)]
  batch_y = np.empty((size_batch, 1), np.float32)
  for iterations in range(nb_iterations_v):
      for i in range(size_batch):
          sample = randint(0, test_size-1)
          batch_x[0][i, :, :, :] = decode_jpeg(test_x[sample][0], img_shape)
          batch_x[1][i, :, :, :] = decode_jpeg(test_x[sample][1], img_shape)
          batch_y[i] = test_y[sample]
      # tester sur le batch
      result = model.test_on_batch(batch_x, batch_y)
      test_loss = test_loss + result[0]
      test_accu = test_accu + result[1]
  # Calculer les mésures de performance moyennes
  test_loss = test_loss / nb_iterations_v
  test_accu = test_accu / nb_iterations_v

  return (test_loss, test_accu)

In [13]:

"""
    ici on affiche les resultat sous forme de graphe avec ces 2 fonctions 
      path_file: emplacement du fichier csv
  """
def plot_loss_resultat(path_file,nom_image):
  

  columns = ['epochs','loss','accu','val_loss','val_accu']
  resultat = pd.read_csv(path_file)
  resultat.columns = columns
  fig, ax = plt.subplots()
  fig.text(0.5,0.9,'Loss',fontsize=15,horizontalalignment='center')
  ax.plot(resultat.epochs,resultat.loss,c='r',label='train')
  ax.plot(resultat.epochs,resultat.val_loss,c="green",label='valid')
  ax.set_xlabel('Epoch')
  ax.set_ylabel('Loss')
  ax.legend()
  fig.savefig(path_file[:-12]+nom_image)

def plot_accuracy_resultat(path_file,nom_image):

  columns = ['epochs','loss','accu','val_loss','val_accu']
  resultat = pd.read_csv(path_file)
  resultat.columns = columns

  fig, ax = plt.subplots()
  fig.text(0.5,0.9,'Accuracy',fontsize=15,horizontalalignment='center')
  ax.plot(resultat.epochs,resultat.accu,c='r',label='train')
  ax.plot(resultat.epochs,resultat.val_accu,c="green",label='valid')
  ax.set_xlabel('Epoch')
  ax.set_ylabel('Accuracy')
  ax.legend()
  fig.savefig(path_file[:-12]+nom_image)
  

In [15]:
def main():
  
  relations = ['fs','fd','ms','md']

  nom_fichiers = []

  dtime = datetime.now().strftime("%m-%d-%Y_%H:%M:%S")

  for relation in relations:
    print("Creation du modele...")
    modele =  build_SiameseConNet((128, 128, 3))

    print("Chargement des donnees...")
    TrainX, TrainY, TestX, TestY = ChargerDonneesRelation(relation)

    print("Debut de l'entrainemtn...")
    resultat = Train(modele, TrainX, TrainY, 15, 100,
                     "/content/drive/MyDrive/Model/Best_/"+dtime+relation+"/model",save_each_epoch=False)


    #enregistre le resultat de chaque epoche pour chaque modele cree
    with open("/content/drive/MyDrive/Model/Best_/"+dtime+relation+"/rs_model.csv", 'w') as f: 
      for epoch in resultat:
        f.write(f"{epoch['epoch']},{epoch['loss']},{epoch['accu']},{epoch['val_loss']},{epoch['val_accu']}\n")
    #phase de tests
    loss,acc = test(modele,TestX,TestY,100)
    print("Resultat des tests",loss,acc)
    modele.save("/content/drive/MyDrive/Model/Best_/"+dtime+relation+"/model_final.h5")
  

In [16]:
main()

Creation du modele...
Model: "Feature_extraction"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 128, 128, 16)      448       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 64, 64, 16)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 64, 64, 32)        4640      
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 32, 32, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 32, 32, 64)        18496     
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 16, 16, 64)        0         
_________________________________________________________________
flatten (Flatten)         