In [None]:
import os
import matplotlib.pyplot as plt
from collections import defaultdict
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications import ResNet101, VGG19
import matplotlib.image as mpimg
from anytree import Node, RenderTree
from anytree.exporter import DotExporter
from tensorflow.keras.callbacks import EarlyStopping
import seaborn as sns
from tensorflow.keras.applications.vgg19 import preprocess_input
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc, f1_score


In [None]:
dataset_path = r"C:\Users\lperf\OneDrive\Desktop\Magistrale\2° Anno\Advanced Machine Learning\AdvancedML - Laboratory\Assignement-Advanced-ML\Project\TrainingFabio_Extracted_data"

In [None]:
# Initialize dictionaries and lists
labels_2_int = {}
labels_1_array = []

# Iterate over the subdirectories in the order they appear
dataset_dir = os.path.join(dataset_path, "train")
for i, folder_name in enumerate(sorted(os.listdir(dataset_dir))):
    labels_2_int[folder_name] = i
    print(folder_name)

    first_part = folder_name.split(' ')[0]
    if first_part not in labels_1_array:  # Keeps the first occurrence
        labels_1_array.append(first_part)

In [None]:
labels_2_int

In [None]:
labels_1_array

In [None]:
labels_1_int = {}
for i, fruit_name in enumerate(labels_1_array):
    # Map folder name to an integer
    labels_1_int[fruit_name] = i

In [None]:
labels_1_int

In [None]:
def preprocess_and_save(input_dir, output_file, show_images=False):
    data = []
    # categoria generale (es mela)
    labels_1 = []
    # categoria specifica (es mela golden)
    labels_2 = []

    # support variables for plotting
    already_printed = []
    images_to_plot = []
    labels_to_plot = []

    class_names = sorted(os.listdir(input_dir))  # Assicura ordine costante delle classi
    for _, class_name in enumerate(class_names):
        class_dir = os.path.join(input_dir, class_name)

        # Use os.path.split to correctly handle path separators in a platform-independent way
        _, folder_name = os.path.split(class_dir)

        label_1 = labels_1_int[folder_name.split(' ')[0]]  # Access label_1
        label_2 = labels_2_int[folder_name]              # Access label_2

        for img_name in os.listdir(class_dir):
            img_path = os.path.join(class_dir, img_name)
            img = load_img(img_path, target_size=(128, 128))  # Ridimensiona
            img_array = img_to_array(img)
            img_array = preprocess_input(img_array)  # Normalizzazione per VGG19
            data.append(img_array)
            labels_1.append(label_1)
            labels_2.append(label_2)

            # Store image and label for plotting
            if label_2 not in already_printed:
                images_to_plot.append(img)
                labels_to_plot.append(folder_name)  # Use folder_name directly
                already_printed.append(label_2)

    if show_images:
        # Plot images and labels
        num_images = len(images_to_plot)
        num_cols = 10
        num_rows = (num_images + num_cols - 1) // num_cols

        plt.figure(figsize=(20, num_rows * 2))
        for i in range(num_images):
            plt.subplot(num_rows, num_cols, i + 1)
            plt.imshow(images_to_plot[i])
            plt.title(labels_to_plot[i])
            plt.axis('off')
        plt.show()
        #save image to file
        plt.savefig('plot.png')

    # Converti in array numpy
    data = np.array(data)
    labels_1 = np.array(labels_1)
    labels_2 = np.array(labels_2)

    # Controllo se è train o validation per applicare lo shuffle
    if 'train' in input_dir:
        indices = np.random.permutation(len(data))  # Genera indici casuali
        data = data[indices]
        labels_1 = labels_1[indices]
        labels_2 = labels_2[indices]
        print("shuffle eseguito")
    else:
        print("shuffle solo per il train")

    # Salva i dati
    np.savez(output_file, x=data, y1=labels_1, y2=labels_2)

In [None]:
#train_path = dataset_path + 'train_data_BIG.npz'
train_path = 'train_data_BIG.npz'
# check if npz file already exists
if not os.path.exists(train_path):
    preprocess_and_save(dataset_path+'\\train', train_path, show_images=True)
else:
    image_path = r"C:\Users\lperf\OneDrive\Desktop\Magistrale\2° Anno\Advanced Machine Learning\Assignement\Project\TrainingFabio_Extracted_data"
    # plot local image using plt
    img = mpimg.imread(image_path + '\\plot.png')
    # Create a figure with a large size
    fig, ax = plt.subplots(figsize=(12, 12))  # Adjust size as needed

    # Display the image
    ax.imshow(img)

    # Maximize the axis size by turning off the spines and ticks
    ax.set_xticks([])
    ax.set_yticks([])
    ax.spines['top'].set_visible(False)
    ax.spines['bottom'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.spines['right'].set_visible(False)

    # Show the image
    plt.show()

In [None]:
val_path = 'val_data_BIG.npz'
# check if npz file already exists
if not os.path.exists(val_path):
    preprocess_and_save(dataset_path + '\\val', val_path)

In [None]:
test_path ='test_data_BIG.npz'
# check if npz file already exists
if not os.path.exists(test_path):
    preprocess_and_save(dataset_path+'\\test', test_path)

In [None]:
train_path = r"C:\Users\lperf\OneDrive\Desktop\Magistrale\2° Anno\Advanced Machine Learning\AdvancedML - Laboratory\Assignement-Advanced-ML\Project\train_data_BIG.npz"
test_path = r"C:\Users\lperf\OneDrive\Desktop\Magistrale\2° Anno\Advanced Machine Learning\AdvancedML - Laboratory\Assignement-Advanced-ML\Project\test_data_BIG.npz"
val_path = r"C:\Users\lperf\OneDrive\Desktop\Magistrale\2° Anno\Advanced Machine Learning\AdvancedML - Laboratory\Assignement-Advanced-ML\Project\val_data_BIG.npz"

# load train data
data_train = np.load(train_path)
x_train, y1_train, y2_train = data_train['x'], data_train['y1'], data_train['y2']

# Load validation data
data_val = np.load(val_path)
x_val, y1_val, y2_val = data_val['x'], data_val['y1'], data_val['y2']

# Load test data
data_test = np.load(test_path)
x_test, y1_test, y2_test = data_test['x'], data_test['y1'], data_test['y2']

In [None]:
y1_train = to_categorical(y1_train, num_classes=70)
y1_val = to_categorical(y1_val, num_classes=70)
y1_test = to_categorical(y1_test, num_classes=70)

In [None]:
y2_train = to_categorical(y2_train, num_classes=123)
y2_val = to_categorical(y2_val, num_classes=123)
y2_test = to_categorical(y2_test, num_classes=123)

In [None]:
num_object_label_1 = y1_train.shape[1]
num_object_label_2 = y2_train.shape[1]
print(f"Numero di classi per label_1: {num_object_label_1}")
print(f"Numero di classi per label_2: {num_object_label_2}")

In [None]:
# Contiamo il numero di occorrenze per ogni etichetta secondaria
y2_counts = np.sum(y2_train, axis=0)  # Conta le occorrenze di ogni classe secondaria

# Creiamo una struttura ad albero
root = Node("Frutti")  # Nodo radice
primary_nodes = {}

# Aggiungi nodi per le label primarie
for primary_label in labels_1_int:
    primary_nodes[primary_label] = Node(f"{primary_label} (0)", parent=root)

# Aggiungi nodi per le label secondarie con conteggio
for class_name, index in labels_2_int.items():
    primary_label = class_name.split(' ')[0]  # Estrarre la label primaria
    count = int(y2_counts[index])  # Numero di occorrenze per questa classe
    if primary_label in primary_nodes:
        Node(f"{class_name} ({count})", parent=primary_nodes[primary_label])

# Aggiorna i conteggi delle label primarie
for primary_label, node in primary_nodes.items():
    total_count = sum(
        int(child.name.split("(")[-1].strip(")")) for child in node.children
    )
    node.name = f"{primary_label} ({total_count})"

# Stampa l'albero con conteggi
for pre, _, node in RenderTree(root):
    print(f"{pre}{node.name}")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter

def plot_label_distribution(y_train, label_names, title="Distribuzione delle etichette"):
    """
    Visualizza la distribuzione delle etichette in un dataset.

    :param y_train: array numpy (one-hot encoded)
    :param label_names: lista dei nomi delle classi
    :param title: titolo del grafico
    """
    # Converti one-hot encoding in indici delle classi
    class_indices = np.argmax(y_train, axis=1)  # Trova la classe con valore massimo

    # Conta la frequenza di ogni classe
    label_counts = Counter(class_indices)

    # Estrai etichette e frequenze
    labels = [label_names[idx] for idx in label_counts.keys()]
    values = list(label_counts.values())

    # Imposta la figura con due sottotrame
    fig, axes = plt.subplots(1, 2, figsize=(16, 8))

    # Diagramma a torta
    axes[0].pie(values, labels=labels, startangle=90, colors=plt.cm.tab20.colors, autopct='%1.1f%%')
    axes[0].set_title(f"{title} (Pie Chart)")
    axes[0].axis('equal')  # Assicura che il diagramma sia un cerchio

    # Diagramma a barre
    axes[1].bar(labels, values, color=plt.cm.tab20.colors[:len(labels)])
    axes[1].set_title(f"{title} (Bar Chart)")
    axes[1].set_xlabel("Etichette")
    axes[1].set_ylabel("Frequenza")
    axes[1].tick_params(axis='x', which='both', bottom=False, top=False, labelbottom=False)  # Rimuove le etichette

    # Mostra il grafico
    plt.tight_layout()
    plt.show()

In [None]:
# Plot distribuzione di y2_train
label_names_y2 = list(labels_2_int.keys())  # Nomi per y2
plot_label_distribution(y2_train, label_names_y2, title="Distribuzione di Y2 (Sottoclassi)")

In [None]:
# Converti one-hot encoding in indici delle classi
class_indices = np.argmax(y2_train, axis=1)

# Conta le occorrenze di ogni classe
class_counts = Counter(class_indices)

# Filtra le classi con meno di 250 e più di 350 immagini
class_over_350 = {label_names_y2[idx]: count for idx, count in class_counts.items() if count > 350}
class_under_250 = {label_names_y2[idx]: count for idx, count in class_counts.items() if count < 250}

# Imposta la figura con due subplot affiancati
fig, axes = plt.subplots(1, 2, figsize=(18, 6), sharey=True)

# Grafico per le classi con più di 350 immagini
axes[0].bar(class_over_350.keys(), class_over_350.values(), color='green')
axes[0].set_xlabel('Classi')
axes[0].set_ylabel('Numero di immagini')
axes[0].set_title('Classi con più di 350 immagini')
axes[0].tick_params(axis='x', rotation=90)

# Grafico per le classi con meno di 250 immagini
axes[1].bar(class_under_250.keys(), class_under_250.values(), color='red')
axes[1].set_xlabel('Classi')
axes[1].set_title('Classi con meno di 250 immagini')
axes[1].tick_params(axis='x', rotation=90)

# Mostra il grafico
plt.tight_layout()
plt.show()

In [None]:
# Plot distribuzione di y1_train
label_names_y1 = list(labels_1_int.keys())  # Nomi per y1
plot_label_distribution(y1_train, label_names_y1, title="Distribuzione di Y1 (Classi Principali)")

In [None]:
def compute_class_weights(y_train):
    # Converti one-hot encoding in indici delle classi
    class_indices = np.argmax(y_train, axis=1)

    # Conta le occorrenze di ogni classe
    class_counts = Counter(class_indices)

    # Numero totale di sample e di classi
    total_samples = len(class_indices)
    num_classes = len(class_counts)

    # Calcola i pesi: più rara è la classe, più alto è il peso
    class_weights = {cls: total_samples / (num_classes * count) for cls, count in class_counts.items()}

    return class_weights

In [None]:
# Calcola i pesi per y2
class_weights_y2 = compute_class_weights(y2_train)

# Mostra i pesi calcolati
print("Pesi per y2_train:", class_weights_y2)

In [None]:
# Calcola i pesi per y1
class_weights_y1 = compute_class_weights(y1_train)

# Mostra i pesi calcolati
print("Pesi per y1_train:", class_weights_y1)

In [None]:
def compute_sample_weights(y_train, class_weights):
    # Converti da one-hot a indici delle classi
    class_indices = np.argmax(y_train, axis=1)

    # Assegna il peso corrispondente a ciascun sample
    sample_weights = np.array([class_weights[idx] for idx in class_indices])

    return sample_weights

In [None]:
# Genera i pesi per ogni sample
sample_weights_y2 =  np.array(compute_sample_weights(y2_train, class_weights_y2))

In [None]:
# Genera i pesi per ogni sample
sample_weights_y1 = np.array( compute_sample_weights(y1_train, class_weights_y1) )

In [None]:
# Combine sample weights for y1 and y2
combined_sample_weights = (sample_weights_y1 + sample_weights_y2) / 2

In [None]:
def check_data_conformity(x, y1, y2, sample_weights_y1=None, sample_weights_y2=None, dataset_name="train"):
    errors = []

    # 1 Controllo dimensione dataset
    num_samples = x.shape[0]

    if y1.shape[0] != num_samples:
        errors.append(f"❌ {dataset_name}: y1 ha {y1.shape[0]} campioni, ma x ne ha {num_samples}")

    if y2.shape[0] != num_samples:
        errors.append(f"❌ {dataset_name}: y2 ha {y2.shape[0]} campioni, ma x ne ha {num_samples}")

    # 2️ Controllo se le etichette sono one-hot encoded
    if len(y1.shape) != 2:
        errors.append(f"❌ {dataset_name}: y1 dovrebbe essere one-hot encoded (shape 2D), ma ha shape {y1.shape}")

    if len(y2.shape) != 2:
        errors.append(f"❌ {dataset_name}: y2 dovrebbe essere one-hot encoded (shape 2D), ma ha shape {y2.shape}")

    # 3️ Controllo sample_weights per y1
    if sample_weights_y1 is not None:
        if sample_weights_y1.shape[0] != num_samples:
            errors.append(f"❌ {dataset_name}: sample_weights_y1 ha {sample_weights_y1.shape[0]} campioni, ma x ne ha {num_samples}")

    # 4️ Controllo sample_weights per y2
    if sample_weights_y2 is not None:
        if sample_weights_y2.shape[0] != num_samples:
            errors.append(f"❌ {dataset_name}: sample_weights_y2 ha {sample_weights_y2.shape[0]} campioni, ma x ne ha {num_samples}")

    # Se non ci sono errori, tutto è conforme
    if not errors:
        print(f"✅ {dataset_name}: Tutti i dati sono conformi!")
    else:
        for error in errors:
            print(error)

In [None]:
check_data_conformity(x_train, y1_train, y2_train, sample_weights_y1, sample_weights_y2, dataset_name="Train")
check_data_conformity(x_val, y1_val, y2_val, dataset_name="Validation")
check_data_conformity(x_test, y1_test, y2_test, dataset_name="Test")

In [None]:
# Definisci la CNN con due rami
def build_model(input_shape, num_classes, num_object_names):
    # Input per l'immagine
    img_input = Input(shape=input_shape)

    # Struttura condivisa della CNN
    x = Conv2D(32, (3, 3), activation='relu')(img_input)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(64, (3, 3), activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(128, (3, 3), activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Flatten()(x)

    # Ramo 1 (Classificazione della classe)
    class_output = Dense(num_classes, activation='softmax', name='y1')(x)

    # Ramo 2 (Classificazione del nome oggetto)
    object_output = Dense(num_object_names, activation='softmax', name='y2')(x)

    # Modello finale
    model = Model(inputs=img_input, outputs=[class_output, object_output])

    return model

In [None]:
input_shape = x_train.shape[1:]
num_classes_1 = len(y1_train[0])
num_classes_2 = len(y2_train[0])

### Model 5

In [None]:
# Carica il modello VGG19 pre-addestrato
VGG_model5 = VGG19(
                weights='imagenet', 
                include_top=False, 
                input_shape=(128, 128, 3),
                input_tensor=None,
                pooling=None,
                classifier_activation="softmax"
            )

In [None]:
VGG_model5.trainable = False  # Blocca i pesi del modello pre-addestrato

In [None]:
# Aggiungi strati personalizzati
x = layers.GlobalAveragePooling2D()(VGG_model5.output)
x = layers.Dense(128, activation="relu")(x)
x = layers.Dropout(0.2)(x)

# 🔴 Ramo 1 - Predizione del Frutto
fruit_output = layers.Dense(num_classes_1, activation="softmax", name="y1")(x)

# 🔵 Ramo 2 - Predizione della Qualità
quality_output = layers.Dense(num_classes_2, activation="softmax", name="y2")(x)

In [None]:
# Crea il modello con doppia uscita
model5 = Model(inputs=VGG_model5.input, outputs=[fruit_output, quality_output])
model5.summary()

In [None]:
# Definisci il numero di classi
num_classes_1 = len(y1_train[0])  # Classi dei frutti
num_classes_2 = len(y2_train[0])  # Classi della qualità

# Compila il modello
model5.compile(
    optimizer='adam',

    loss={
        'y1': 'categorical_crossentropy', 
        'y2': 'categorical_crossentropy'
        },

    metrics={
        'y1': 'accuracy', 
        'y2': 'accuracy'
        }
)

In [None]:
early_stopping = EarlyStopping(
    monitor="val_loss",  # Controlla la perdita sulla validation set
    patience=2,          # Numero di epoche senza miglioramento prima di fermarsi
    min_delta=0.000001,     # Soglia per considerare un miglioramento
)

# Addestramento del modello
history5 = model5.fit(
    x_train,
    {'y1': y1_train, 
     'y2': y2_train},
    validation_data=(
        x_val, {
            'y1': y1_val, 
            'y2': y2_val}),
    epochs=50,
    batch_size=256,
    callbacks=[early_stopping]
)

In [None]:
def plot_training_history(history):
    """
    Visualizza i grafici di loss e accuracy per un modello con due output.
    
    Args:
        history: Storia dell'addestramento del modello (output di model.fit())
    """
    epochs = range(1, len(history.history['loss']) + 1)
    
    plt.figure(figsize=(12, 5))
    
    # 🔴 Grafico della Loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, history.history['loss'], 'r-', label='Training Loss')
    plt.plot(epochs, history.history['val_loss'], 'r--', label='Validation Loss')
    plt.plot(epochs, history.history['y1_loss'], 'b-', label='Y1 (Fruit) Loss')
    plt.plot(epochs, history.history['val_y1_loss'], 'b--', label='Val Y1 Loss')
    plt.plot(epochs, history.history['y2_loss'], 'g-', label='Y2 (Quality) Loss')
    plt.plot(epochs, history.history['val_y2_loss'], 'g--', label='Val Y2 Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training & Validation Loss')
    
    # 🔵 Grafico della Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, history.history['y1_accuracy'], 'b-', label='Y1 (Fruit) Accuracy')
    plt.plot(epochs, history.history['val_y1_accuracy'], 'b--', label='Val Y1 Accuracy')
    plt.plot(epochs, history.history['y2_accuracy'], 'g-', label='Y2 (Quality) Accuracy')
    plt.plot(epochs, history.history['val_y2_accuracy'], 'g--', label='Val Y2 Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Training & Validation Accuracy')
    
    plt.tight_layout()
    plt.show()

In [None]:
# Dopo l'addestramento del modello
plot_training_history(history5)

In [None]:
model5.save("modelVGG19Keras5.keras")