In [None]:
# Modèle basé sur un échentillon des 10 espèces les plus représentées.
# L'objectif est d'observer la qualité d'entrainement d'un modèle sur un echantillon réduit.
# La vitesse d'entrainnement du modèle permet d'ajuster facilement les paramètres et les callbacks.

# Lib

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_hub as hub
#import cv2
%matplotlib inline



from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Fonctions

In [2]:
def import_df(chemin_images, chemin_csv, pourcentage_echantillon):
    '''Importe le fichier csv et construit 4 df :
        - df reprenant l'ensemble des données
        - df_ech est un echantillon de df
        - top10 ne reprend que les 10 labels les plus vus dans df
        - top10_ech est un echantillon de top10
        '''

    # import du df
    df = pd.read_csv(chemin_csv, low_memory=False)
    df['image_url'] = df['image_url'].str.replace('.../images/', chemin_images)
    print(f"Nombre d'images chargées pour df: {df.shape[0]}")
    print(f"Nb especes dans df: {df['label'].nunique()}")


    # Contruction de l'echantillon
    L = len(df)
    L_ech = int(pourcentage_echantillon * L)
    df_ech = df.sample(n=L_ech, random_state=10)
    df_ech.reset_index(inplace=True, drop=True)
    print(f"Nombre d'images chargées pour df_ech: {df_ech.shape[0]}")
    print(f"Nb especes dans df_ech: {df_ech['label'].nunique()}")




    return df, df_ech

In [3]:
from tensorflow.keras.applications.efficientnet import preprocess_input
#from tensorflow.keras.preprocessing.image import load_img, img_to_array

def augment_img(image_path, label):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.resize(img, (224, 224))
    img = preprocess_input(img)

    img = tf.image.random_flip_left_right(img)
    img = tf.image.random_flip_up_down(img)
    img = tf.image.random_brightness(img, max_delta=0.2)
    img = tf.image.random_contrast(img, lower=0.8, upper=1.2)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img = (img - tf.math.reduce_min(img)) / (tf.math.reduce_max(img) - tf.math.reduce_min(img))
    #img = tf.image.random_rotation(img)
    #img = tf.image.random_crop(img)

    return img, label

In [4]:
def create_tf_dataset(image_path, labels, batch_size):
    image_path = image_path.tolist()  # Convertir les chemins d'images en liste
    labels = labels.tolist()  # Convertir les labels en liste
    
    dataset = tf.data.Dataset.from_tensor_slices((image_path, labels))
    dataset = dataset.map(augment_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=len(image_path))
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    return dataset

In [5]:
import os
def controle_presence_fichiers(df, chemin_images):


    image_directory = chemin_images
    missing_files = []

# Parcourir chaque ligne du DataFrame
    for index, row in df.iterrows():
        image_path = os.path.join(image_directory, row['image_lien'])
    
        if not os.path.exists(image_path):
            missing_files.append(image_path)

    # Afficher les fichiers non trouvés
    if missing_files:
        print("\nFichiers non trouvés :")
        for file_path in missing_files:
            print(file_path)
    else:
        print("\nTous les fichiers sont présents.")

# Callbacks

In [6]:
from tensorflow.keras import callbacks
%load_ext tensorboard
log_dir = '/'
tensorboard = callbacks.TensorBoard(log_dir = log_dir)

In [7]:
from tensorflow.keras.callbacks import EarlyStopping
early_stopping = EarlyStopping(monitor = 'val_accuracy', 
                               min_delta = 0.01,
                               patience = 3,
                               verbose = 1,
                               mode = 'auto',
                               restore_best_weights = True)

In [8]:
from tensorflow.keras.callbacks import ReduceLROnPlateau
earlystop = ReduceLROnPlateau(monitor = 'val_loss',
                        min_delta = 0.01,
                        patience = 3,
                        factor = 0.15, 
                        cooldown = 3,
                        verbose = 1)

In [9]:
from tensorflow.keras.callbacks import ModelCheckpoint
checkpoint = ModelCheckpoint(filepath='../model/model_complet', monitor='val_accuracy', save_best_only=True, verbose=1)

In [10]:
from tensorflow.keras.callbacks import LearningRateScheduler
# Définition de la fonction pour ajuster le taux d'apprentissage
def lr_schedule(epoch):
    """
    Fonction pour ajuster le taux d'apprentissage en fonction de l'époque.
    """
    learning_rate = 0.1
    if epoch > 15:
        learning_rate = 0.01
    if epoch > 40:
        learning_rate = 0.001
    return learning_rate

lr_scheduler = LearningRateScheduler(lr_schedule, verbose=1)

In [11]:
from tensorflow.keras.callbacks import Callback
from timeit import default_timer as timer

class TimingCallback(Callback):
    def __init__(self, logs={}):
        self.logs=[]
    def on_epoch_begin(self, epoch, logs={}):
        self.starttime = timer()
    def on_epoch_end(self, epoch, logs={}):
        self.logs.append(timer()-self.starttime)

time_callback = TimingCallback()

In [12]:
from tensorflow.keras.callbacks import TerminateOnNaN
TON = TerminateOnNaN()

# Données

In [13]:
chemin_images = '../../images/'
chemin_csv = '../data/top10.csv'
pourcentage_echantillon = 0.1 # Si 0.1 : 10% du contenu


df, df_ech = import_df(chemin_images, chemin_csv, pourcentage_echantillon)

Nombre d'images chargées pour df: 64372
Nb especes dans df: 10
Nombre d'images chargées pour df_ech: 6437
Nb especes dans df_ech: 10


  df['image_url'] = df['image_url'].str.replace('.../images/', chemin_images)


In [14]:
df.head()

Unnamed: 0,label,image_lien,image_url
0,Agaricales,486562.jpg,../../images/486562.jpg
1,Agaricales,509189.jpg,../../images/509189.jpg
2,Agaricales,486561.jpg,../../images/486561.jpg
3,Agaricales,231418.jpg,../../images/231418.jpg
4,Agaricales,508881.jpg,../../images/508881.jpg


In [15]:
controle_presence_fichiers(df, chemin_images)
df.drop('image_lien', axis=1, inplace=True)
df_ech.drop('image_lien', axis=1, inplace=True)



Tous les fichiers sont présents.


# Modèle

In [16]:
from tensorflow.keras import layers, models

efficientnet_url = 'https://tfhub.dev/google/imagenet/efficientnet_v2_imagenet21k_ft1k_b0/classification/2'
efficientNet = hub.KerasLayer(efficientnet_url, trainable=False)


# Créer le modèle EfficientNet sans les couches de classification
input_layer = layers.Input(shape=(224, 224, 3))
x = efficientNet(input_layer)
efficientNet = models.Model(inputs=input_layer, outputs=x)



# Créer le modèle CNN
model = models.Sequential()
model.add(efficientNet)

# Couche entièrement connectée
model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(32, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))  # 10 classes de sortie

# Compiler le modèle
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Afficher un résumé du modèle
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 model (Functional)          (None, 1000)              7200312   
                                                                 
 flatten (Flatten)           (None, 1000)              0         
                                                                 
 dense (Dense)               (None, 64)                64064     
                                                                 
 dense_1 (Dense)             (None, 32)                2080      
                                                                 
 dense_2 (Dense)             (None, 10)                330       
                                                                 
Total params: 7266786 (27.72 MB)
Trainable params: 66474 (259.66 KB)
Non-trainable params: 7200312 (27.47 MB)
_________________________________________________________________


## Jeux train, test & val

In [17]:
data = df.drop('label', axis=1)
target = df['label']

s = LabelEncoder()
target = s.fit_transform(target)


X_train, X_temp, y_train, y_temp = train_test_split(data, target, test_size=0.25, random_state=10)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=10)

## Datasets

In [18]:
batch_size = 32
ds_train= create_tf_dataset(X_train.image_url, y_train, batch_size)
ds_test = create_tf_dataset(X_test.image_url, y_test, batch_size)
ds_val = create_tf_dataset(X_val.image_url, y_val, batch_size)

## Fit

In [None]:
#steps_per_epoch = X_train.shape[0] // batch_size


history_ech = model.fit(ds_train,
                        validation_data = ds_val,
                        epochs=30,
                        callbacks = [tensorboard, early_stopping, earlystop, checkpoint, lr_scheduler, time_callback, TON],
                        verbose=True)

## Evaluation

In [None]:
%reload_ext tensorboard
test_loss_ech, test_accuracy_ech = model.evaluate(ds_test_ech)
print("Test accuracy:", test_accuracy_ech)

In [None]:
import matplotlib.pyplot as plt

plt.plot(history_ech.history['accuracy'], label='accuracy')
plt.plot(history_ech.history['val_accuracy'], label='val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

## Interprétabilité (Grad Cam)

In [None]:
from IPython.display import Image, display
import matplotlib.cm as cm

In [None]:
model_builder = keras.applications.xception.Xception
img_size = (224, 224)
preprocess_input = keras.applications.xception.preprocess_input
decode_predictions = keras.applications.xception.decode_predictions

last_conv_layer_name = "block14_sepconv2_act"

# Chemin local vers image
img_path = keras.utils.get_file('../../images/28.jpg')
display(Image(img_path))

In [None]:
def get_img_array(img_path, size):
    # `img` is a PIL image of size 224x224
    img = keras.utils.load_img(img_path, target_size=size)
    # `array` is a float32 Numpy array of shape (224, 292249, 3)
    array = keras.utils.img_to_array(img)
    # We add a dimension to transform our array into a "batch"
    # of size (1, 224, 224, 3)
    array = np.expand_dims(array, axis=0)
    return array


def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    # First, we create a model that maps the input image to the activations
    # of the last conv layer as well as the output predictions
    grad_model = keras.models.Model(
        model.inputs, [model.get_layer(last_conv_layer_name).output, model.output]
    )

    # Then, we compute the gradient of the top predicted class for our input image
    # with respect to the activations of the last conv layer
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    # This is the gradient of the output neuron (top predicted or chosen)
    # with regard to the output feature map of the last conv layer
    grads = tape.gradient(class_channel, last_conv_layer_output)

    # This is a vector where each entry is the mean intensity of the gradient
    # over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # We multiply each channel in the feature map array
    # by "how important this channel is" with regard to the top predicted class
    # then sum all the channels to obtain the heatmap class activation
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

In [None]:
# Prepare image
img_array = preprocess_input(get_img_array(img_path, size=img_size))

# Make model
grad = model_builder(weights="imagenet")

# Remove last layer's softmax
grad.layers[-1].activation = None

# Print what the top predicted class is
preds = grad.predict(img_array)
print("Predicted:", decode_predictions(preds, top=1)[0])

# Generate class activation heatmap
heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name)

# Display heatmap
plt.matshow(heatmap)
plt.show()

In [None]:
def save_and_display_gradcam(img_path, heatmap, cam_path="cam.jpg", alpha=0.4):
    # Load the original image
    img = keras.utils.load_img(img_path)
    img = keras.utils.img_to_array(img)

    # Rescale heatmap to a range 0-255
    heatmap = np.uint8(255 * heatmap)

    # Use jet colormap to colorize heatmap
    jet = cm.get_cmap("jet")

    # Use RGB values of the colormap
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]

    # Create an image with RGB colorized heatmap
    jet_heatmap = keras.utils.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = keras.utils.img_to_array(jet_heatmap)

    # Superimpose the heatmap on original image
    superimposed_img = jet_heatmap * alpha + img
    superimposed_img = keras.utils.array_to_img(superimposed_img)

    # Save the superimposed image
    superimposed_img.save(cam_path)

    # Display Grad CAM
    display(Image(cam_path))


save_and_display_gradcam(img_path, heatmap)

## Sauvegarde modèle

In [None]:
model.save('./model/modele_complet')