# ***Anomaly Detection:<br> Spinach Adulterated Pistachios***

---

# TODO: USE LOADIMAGESFROMDATASET MTA3 L KERAS

Les pistaches adultérées avec des épinards posent un problème de sécurité alimentaire. Cette fraude consiste à mélanger des feuilles d'épinards avec des pistaches pour augmenter le poids du produit et réaliser des profits illégaux. <br>Les consommateurs obtiennent un produit de qualité inférieure et potentiellement dangereux, pouvant contenir des contaminants ou des allergènes. <br>Les autorités sanitaires et les organismes de contrôle travaillent pour détecter et prévenir cette fraude, protégeant ainsi les consommateurs et la chaîne d'approvisionnement alimentaire. Pour attaquer ce problème, nous utiliserons un autoencodeur qui détectera ce type de fraude.

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import load_img, img_to_array



In [None]:
import tensorflow as tf

# Définir les paramètres de prétraitement des images
batch_size = 32
image_size = (64, 64)

# Fonction pour charger et prétraiter les images
def load_images_from_folder(folder, target_size=(64, 64)):
    images = []
    for filename in os.listdir(folder):
        img = load_img(os.path.join(folder, filename), target_size=target_size)
        img_array = img_to_array(img) / 255.0  # Normalisation des valeurs des pixels
        images.append(img_array)
    return np.array(images)

# Charger les images de pistaches pures
pure_pistachios_folder = './dataset/pure/Pure Pistachios'
pure_pistachios = load_images_from_folder(pure_pistachios_folder)

In [None]:
from tensorflow.keras import layers, models

# Définir l'architecture du modèle
input_img = layers.Input(shape=(64, 64, 3))

# Encodeur
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
encoded = layers.MaxPooling2D((2, 2), padding='same')(x)

# Décodeur
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(encoded)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
decoded = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

# Créer le modèle
autoencoder = models.Model(input_img, decoded)

# Compiler le modèle
autoencoder.compile(optimizer='adam', loss='mse')

# Afficher un résumé de l'architecture du modèle
autoencoder.summary()


In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Créer un générateur d'augmentation de données
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Prétraiter et augmenter les images
augmented_pure_pistachios = []
for batch in datagen.flow(pure_pistachios, batch_size=32):
    augmented_pure_pistachios.extend(batch)
    if len(augmented_pure_pistachios) >= len(pure_pistachios):
        break

# Convertir la liste en tableau numpy
augmented_pure_pistachios = np.array(augmented_pure_pistachios)

pure_pistachios_dataset_val = np.array(pure_pistachios_dataset_val)
print(pure_pistachios_dataset_val.shape)

# Entraîner l'autoencodeur sur les données augmentées
autoencoder.fit(augmented_pure_pistachios, validation_data= pure_pistachios_dataset_val, epochs=30, batch_size=32, shuffle=True)


In [None]:
# Fonction pour détecter la fraude
def detect_fraud(images, model, threshold=0.0038):
    fraud_images = []
    for img in images:
        reconstructed_img = model.predict(np.expand_dims(img, axis=0))
        reconstruction_error = np.mean((img - reconstructed_img[0]) ** 2)
        print(reconstruction_error)
        if reconstruction_error > threshold:
            fraud_images.append(img)
    return fraud_images

# Charger les images adultérées et détecter la fraude
# adulterated_pistachios_folder = './dataset/pure/Pure pistachios' # 0.0038 BEHI LEL PUR
adulterated_pistachios_folder = './dataset/all_adulterated' 

adulterated_pistachios = load_images_from_folder(adulterated_pistachios_folder)
fraudulent_images = detect_fraud(adulterated_pistachios, autoencoder)
print(f"Nombre d'images frauduleuses détectées : {len(fraudulent_images)}")

## **1. Importation des Bibliothèques :**
---
Nous commençons par importer les bibliothèques nécessaires pour ce projet. TensorFlow sera utilisé pour construire et entraîner notre modèle de réseau de neurones.

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.utils import plot_model
import numpy as np


## **2. Définition des Paramètres :**
---
Nous définissons les paramètres de notre projet, notamment le chemin vers notre dataset, la taille des lots, et les dimensions des images.


In [None]:
# Chemin vers le répertoire parent contenant les sous-dossiers
dataset_dir = './dataset'

batch_size = 32
img_height = 256
img_width = 256


## **3. Chargement des données :**
---
Nous utilisons `image_dataset_from_directory` pour charger nos images depuis les sous-dossiers. Cette fonction divise automatiquement les données en ensembles d'entraînement et de validation.


### **Structure de notre dataset :**
dataset/ <br>
├── 10% spinach adulterated pistachios/ <br>
├── 20% spinach adulterated pistachios/ <br>
├── 30% spinach adulterated pistachios/ <br> 
├── 40% spinach adulterated pistachios/ <br>
├── 50% spinach adulterated pistachios/ <br>
└── Pure pistachios/ <br>

In [None]:
# Chargement des datasets d'entraînement 
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_dir,
    labels='inferred',
    label_mode='binary',  # Classification multiclasses
    batch_size=batch_size,
    image_size=(img_height, img_width),
    shuffle=True,
    seed=123,
    validation_split=0.2,
    subset="training"
)


In [None]:
# Chargement des datasets de validation
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_dir,
    labels='inferred',
    label_mode='binary',  # Classification multiclasses
    batch_size=batch_size,
    image_size=(img_height, img_width),
    shuffle=True,
    seed=123,
    validation_split=0.2,
    subset="validation"
)


## **4. Normalisation des images:**
---
 Nous normalisons les images pour que les valeurs des pixels soient comprises entre 0 et 1, ce qui facilite l'entraînement du modèle.

La normalisation des données dans le cadre d'un autoencodeur est essentielle pour plusieurs raisons. 

D'une part, elle rend les données comparables et cohérentes, facilitant ainsi le processus d'apprentissage en ramenant les valeurs à une échelle commune.

D'autre part, elle évite que certaines caractéristiques ne dominent le processus d'apprentissage, en assurant une contribution équilibrée de chaque caractéristique à la représentation latente apprise par l'autoencodeur. 

Donc, la normalisation des données favorise un meilleur apprentissage et des représentations latentes plus robustes.

In [None]:
# Normalisation des images
normalization_layer = tf.keras.layers.Rescaling(1./255)
normalized_train_ds = train_ds.map(lambda x, y: (normalization_layer(x), y))
normalized_val_ds = val_ds.map(lambda x, y: (normalization_layer(x), y))


## **5. Définition du Modèle :**
---
Nous définissons un modèle de réseau de neurones convolutionnel (CNN) pour la classification d'images. Ce modèle comprend plusieurs couches convolutives et de pooling, suivies de couches denses.


In [None]:
# Définition du modèle de classification
num_classes = len(train_ds.class_names)

print('Classes :', train_ds.class_names)
print('Nombre de classes :', num_classes)

model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(img_height, img_width, 3)),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dense(num_classes, activation='softmax')  # Multiclasses
])


## **6. Compilation du Modèle :**
---
Nous compilons le modèle en utilisant l'optimiseur Adam et la perte `SparseCategoricalCrossentropy`, qui est adaptée pour la classification multiclasses.<br>
Nous affichons le résumé du modèle pour visualiser les différentes couches et le nombre de paramètres entraînables.


In [None]:

model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

model.summary()


In [None]:
# plot_model(model, to_file='model_schema.png', show_shapes=True, show_layer_names=True)

## **7. Entraînement du Modèle :**
Nous entraînons le modèle sur notre dataset d'entraînement et évaluons ses performances sur le dataset de validation. Le nombre d'époques d'entraînement est fixé à 50.


In [None]:
# Entraînement du modèle
history = model.fit(
    normalized_train_ds,
    validation_data=normalized_val_ds,
    epochs=10
)


## **8. Évaluation du Modèle :**
---
Nous évaluons les performances du modèle sur le dataset de validation pour obtenir la précision finale.


In [None]:

# Évaluation du modèle
loss, accuracy = model.evaluate(normalized_val_ds)
print(f'Validation accuracy: {accuracy:.2f}')


## **9. Prédiction avec le Modèle :**
---
Nous définissons une fonction de prédiction qui prend en entrée une image, la prétraite et utilise le modèle pour prédire sa classe. Un exemple de prédiction est fourni.


In [None]:

# Faire des prédictions
class_names = train_ds.class_names

def predict(model, img_path):
    img = tf.keras.preprocessing.image.load_img(img_path, target_size=(img_height, img_width))
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array = tf.expand_dims(img_array, 0)  # Create batch axis

    predictions = model.predict(img_array)
    predicted_class = class_names[np.argmax(predictions)]
    return predicted_class


In [None]:

# Exemple de prédiction
img_path = 'dataset/pure/Pure pistachios/IMG_2203.JPG'
predicted_class = predict(model, img_path)
print(f'The predicted class is: {predicted_class}')


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np


In [None]:

# Chemin vers le répertoire parent contenant les sous-dossiers
dataset_dir = './dataset'

batch_size = 32
img_height = 256
img_width = 256


In [None]:

# Chargement des datasets d'entraînement et de validation
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_dir,
    labels='inferred',
    label_mode='binary',  # Classification binaire
    batch_size=batch_size,
    image_size=(img_height, img_width),
    shuffle=True,
    seed=123,
    validation_split=0.2,
    subset="training"
)


In [None]:

val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_dir,
    labels='inferred',
    label_mode='binary',  # Classification binaire
    batch_size=batch_size,
    image_size=(img_height, img_width),
    shuffle=True,
    seed=123,
    validation_split=0.2,
    subset="validation"
)


In [None]:

# Normalisation des images
normalization_layer = tf.keras.layers.Rescaling(1./255)
normalized_train_ds = train_ds.map(lambda x, y: (normalization_layer(x), y))
normalized_val_ds = val_ds.map(lambda x, y: (normalization_layer(x), y))


In [None]:

# Définition de l'autoencodeur
input_img = layers.Input(shape=(img_height, img_width, 3))


In [None]:

# Encoder
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
encoded = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)


In [None]:

# Decoder
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(encoded)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
decoded = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)


In [None]:

autoencoder = models.Model(input_img, decoded)
autoencoder.compile(optimizer='adam', loss='binary_crossentropy')

autoencoder.summary()


In [None]:
# Filtrage des images pures pour l'entraînement de l'autoencodeur
pure_pistachios_ds = normalized_train_ds.filter(lambda x, y: tf.equal(y, 0))

In [None]:

# Entraînement de l'autoencodeur
autoencoder.fit(
    pure_pistachios_ds,
    epochs=10,
    validation_data=normalized_val_ds
)


In [None]:

# Détection des anomalies
def calculate_reconstruction_error(original, reconstructed):
    return np.mean(np.abs(original - reconstructed), axis=(1, 2, 3))

threshold = 0.02  # Ce seuil doit être déterminé empiriquement


In [None]:

def detect_anomalies(autoencoder, dataset):
    anomalies = []
    for images, labels in dataset:
        reconstructed = autoencoder.predict(images)
        errors = calculate_reconstruction_error(images.numpy(), reconstructed)
        anomalies.extend(errors > threshold)
    return anomalies


In [None]:

anomalies_train = detect_anomalies(autoencoder, normalized_train_ds)
anomalies_val = detect_anomalies(autoencoder, normalized_val_ds)

print(f"Anomalies in training set: {np.sum(anomalies_train)}")
print(f"Anomalies in validation set: {np.sum(anomalies_val)}")
