<a href="https://colab.research.google.com/github/WebberMark02/machine-learning-project/blob/main/project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Importo le librerie necessarie e scelgo di utilizzare "tensorflow" come
backend per "Keras".

In [None]:
import numpy as np
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
from matplotlib import pyplot as plt
import sklearn
from tensorflow.keras.datasets import mnist, fashion_mnist

Imposto le variabili globali.

In [None]:
seed = 142
training_set_size = 160000
testing_set_size = 5000
validation_set_size = 20000
image_shape = (32, 32, 1)
batch_size = 8192
epochs = 20
learning_rate = 0.01
early_stopping_patience = 50
reduce_lr_patience = 5

Definisco il generatore di immagini sulle quali il modello
verrà addestrato e testato.  
Il generatore restituisce "batchsize" immagini; ogni immagine è
la media di due immagini scelte casualmente rispettivamente da 'x1' e 'x2'.  
Il generatore restituisce, inoltre, per ogni media di immagini, la coppia delle immagini delle quali è stata calcolata la media stessa.

In [None]:
def datagenerator(x1, x2, batchsize):
    n1 = x1.shape[0]
    n2 = x2.shape[0]
    while True:
        num1 = np.random.randint(0, n1, batchsize)
        num2 = np.random.randint(0, n2, batchsize)

        x_data = (x1[num1] + x2[num2]) / 2.0
        y_data = (x1[num1], x2[num2])

        yield x_data, y_data

Definisco una funzione per il controllo del bilanciamento delle classi.  
Mi servirà per verificare che la divisione stratificata abbia avuto successo.

In [None]:
def stampa_percentuale(y, title = ""):
  if title:
    print(title)
  # Calcolo le occorrenze di ciascuna classe nel dataset.
  unique, counts = np.unique(y, return_counts = True)
  # Calcolo la % di occorrenze per ciascuna classe.
  percentuali = (counts / len(y)) * 100
  # Stampo le occorrenze e le percentuali.
  for classe, conteggio, percentuale in zip(unique, counts, percentuali):
      print(f"Classe {classe}: Occorrenze = {conteggio}, Percentuale {percentuale} %" )
  print(f"Totale occorrenze : {sum(counts)}")
  print()

Definisco un modello banale.
Mi servirà per valutare che la rete abbia prestazioni migliori di esso.

In [None]:
def ide_model(x):
   return((x,x))

Ora ha inizio la fase di caricamento e preparazione dei dataset che verranno utilizzati
per addestrare e esaminare le prestazioni della rete.

Prima di tutto, carico i training set e i testing set di "MNIST" e "Fashion MNIST".

In [None]:
(mnist_x_train, mnist_y_train), (mnist_x_test, mnist_y_test) = mnist.load_data()
(fashion_mnist_x_train, fashion_mnist_y_train), (fashion_mnist_x_test, fashion_mnist_y_test) = fashion_mnist.load_data()

print(np.shape(mnist_x_train))

Ridimensiono le immagini tramite padding, portando la loro risoluzione da 28x28 a 32x32.  
Inoltre, le normalizzo nell'intervallo [0, 1].

In [None]:
#normalize in and pad
mnist_x_train = np.pad(mnist_x_train,((0,0),(2,2),(2,2)))/255.
mnist_x_test = np.pad(mnist_x_test,((0,0),(2,2),(2,2)))/255.
fashion_mnist_x_train = np.pad(fashion_mnist_x_train,((0,0),(2,2),(2,2)))/255.
fashion_mnist_x_test = np.pad(fashion_mnist_x_test,((0,0),(2,2),(2,2)))/255.

print(np.shape(mnist_x_train))

Aggiungo una dimensione agli array numpy delle immagini (non modifico affatto
le immagini).  
Mi serve per rendere le immagini compatibili con le dimensioni
del layer di input della rete neurale.

In [None]:
print(np.shape(mnist_x_train))
print(np.shape(mnist_x_test))
print(np.shape(fashion_mnist_x_train))
print(np.shape(fashion_mnist_x_test))

mnist_x_train = np.reshape(mnist_x_train, (mnist_x_train.shape[0], mnist_x_train.shape[1], mnist_x_train.shape[2], 1))
mnist_x_test = np.reshape(mnist_x_test, (mnist_x_test.shape[0], mnist_x_test.shape[1], mnist_x_test.shape[2], 1))
fashion_mnist_x_train = np.reshape(fashion_mnist_x_train, (fashion_mnist_x_train.shape[0], fashion_mnist_x_train.shape[1], fashion_mnist_x_train.shape[2], 1))
fashion_mnist_x_test = np.reshape(fashion_mnist_x_test, (fashion_mnist_x_test.shape[0], fashion_mnist_x_test.shape[1], fashion_mnist_x_test.shape[2], 1))

print(np.shape(mnist_x_train))
print(np.shape(mnist_x_test))
print(np.shape(fashion_mnist_x_train))
print(np.shape(fashion_mnist_x_test))

Visualizzo qualche immagine per accertarmi che l'operazione di reshaping non le abbia modificate.

In [None]:
plt.imshow(mnist_x_train[0], cmap='gray')
plt.show()
plt.imshow(mnist_x_test[0], cmap='gray')
plt.show()
plt.imshow(fashion_mnist_x_train[0], cmap='gray')
plt.show()
plt.imshow(fashion_mnist_x_test[0], cmap='gray')
plt.show()

print(mnist_x_train[0].shape)

Controllo il bilanciamento delle classi nei training set prima della divisione stratificata.

In [None]:
stampa_percentuale(mnist_y_train, 'MNIST training set completo')
stampa_percentuale(fashion_mnist_y_train, 'Fashion MNIST training set completo')

Divido ogni training set in due insiemi: il training set e il validation set.
Le immagini dei validation set verranno usate per l'ottimizzazione degli iper-parametri della rete.
Ogni validation set conterrà il 20% delle immagini del training set di partenza.
Uso la stratificazione per mantenere le classi nelle stesse proporzioni.

In [None]:
mnist_x_train, mnist_x_val, mnist_y_train, mnist_y_val = sklearn.model_selection.train_test_split(mnist_x_train, mnist_y_train, test_size=0.2, stratify=mnist_y_train, random_state=seed)
fashion_mnist_x_train, fashion_mnist_x_val, fashion_mnist_y_train, fashion_mnist_y_val = sklearn.model_selection.train_test_split(fashion_mnist_x_train, fashion_mnist_y_train, test_size=0.2, stratify=fashion_mnist_y_train, random_state=seed)

Controllo il bilanciamento delle classi nei training set e nei validation set ottenuti dalla divisione stratificata.

In [None]:
stampa_percentuale(mnist_y_train, 'MNIST training set risultante')
stampa_percentuale(mnist_y_val, 'MNIST validation set risultante')

stampa_percentuale(fashion_mnist_y_train, 'Fashion MNIST training set risultante')
stampa_percentuale(fashion_mnist_y_val, 'Fashion MNIST validation set risultante')

Creo tre istanze del generatore per generare il training set, il validation set e il testing set finali.

In [None]:
traingen = datagenerator(mnist_x_train, fashion_mnist_x_train, training_set_size)
valgen = datagenerator(mnist_x_val, fashion_mnist_x_val, validation_set_size)
testgen = datagenerator(mnist_x_test, fashion_mnist_x_test, testing_set_size)

Creo il training set e il validation set.

In [None]:
x_train, y_train = next(traingen)
x_val, y_val = next(valgen)

Verifico che le dimensioni dei due dataset siano corrette.

In [None]:
print(x_train.shape)
print(x_val.shape)

print(len(y_train))
print(len(y_val))

print(y_train[0].shape)
print(y_train[1].shape)
print(y_val[0].shape)
print(y_val[1].shape)

Controllo che i valori dei pixel delle immagini appartengano all'intervallo [0,1].

In [None]:
print(np.min(x_train[0]), np.max(x_train[0]))
print(np.min(x_val[0]), np.max(x_val[0]))

La fase di preparazione dei dataset è completa.
Ora viene definita e compilata la rete neurale.  
Essa è formata da due autoencoder che condividono lo stesso layer di input.
Ogni autoencoder restituisce una delle due immagini la cui media è l'immagine di partenza, ricostruendola il più fedelmente possibile.

In [None]:
def build_neural_network():
    # Input Layer
    inputs = keras.Input(shape = image_shape, name = 'InputImage')

    # Encoder 1
    enc1_conv1 = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same', name = 'Enc1_Conv1', kernel_initializer='random_normal', bias_initializer='zeros')(inputs)
    enc1_pool = keras.layers.MaxPooling2D((2, 2), padding='same', name = 'Enc1_Pool')(enc1_conv1)
    enc1_conv2 = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same', name = 'Enc1_Conv2', kernel_initializer='random_normal', bias_initializer='zeros')(enc1_pool)
    encoded1 = keras.layers.MaxPooling2D((2, 2), padding='same', name = 'Encoded1')(enc1_conv2)

    # Encoder 2
    enc2_conv1 = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same', name = 'Enc2_Conv1', kernel_initializer='random_normal', bias_initializer='zeros')(inputs)
    enc2_pool = keras.layers.MaxPooling2D((2, 2), padding='same', name = 'Enc2_Pool')(enc2_conv1)
    enc2_conv2 = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same', name = 'Enc2_Conv2', kernel_initializer='random_normal', bias_initializer='zeros')(enc2_pool)
    encoded2 = keras.layers.MaxPooling2D((2, 2), padding='same', name = 'Encoded2')(enc2_conv2)

    # Decoder 1
    dec1_conv1 = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same', name = 'Dec1_Conv1', kernel_initializer='random_normal', bias_initializer='zeros')(encoded1)
    dec1_upsampling1 = keras.layers.UpSampling2D((2, 2), name = 'Dec1_Upsampling1')(dec1_conv1)
    dec1_conv2 = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same', name = 'Dec1_Conv2', kernel_initializer='random_normal', bias_initializer='zeros')(dec1_upsampling1)
    dec1_upsampling2 = keras.layers.UpSampling2D((2, 2), name = 'Dec1_Upsampling2')(dec1_conv2)
    decoded1 = keras.layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same', name = 'MNIST_Image', kernel_initializer='random_normal', bias_initializer='zeros')(dec1_upsampling2)

    # Decoder 2
    dec2_conv1 = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same', name = 'Dec2_Conv1', kernel_initializer='random_normal', bias_initializer='zeros')(encoded2)
    dec2_upsampling1 = keras.layers.UpSampling2D((2, 2), name = 'Dec2_Upsampling1')(dec2_conv1)
    dec2_conv2 = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same', name = 'Dec2_Conv2', kernel_initializer='random_normal', bias_initializer='zeros')(dec2_upsampling1)
    dec2_upsampling2 = keras.layers.UpSampling2D((2, 2), name = 'Dec2_Upsampling2')(dec2_conv2)
    decoded2 = keras.layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same', name = 'Fashion_MNIST_Image', kernel_initializer='random_normal', bias_initializer='zeros')(dec2_upsampling2)

    model = keras.Model(inputs = inputs, outputs = [decoded1, decoded2], name = 'MNIST_Reconstruction_Model')
    return model

Istanzio la rete e mostro i suoi dettagli.

In [None]:
model = build_neural_network()
model.summary()

Visualizzo un plot della rete.

In [None]:
keras.utils.plot_model(model, "model.png", show_shapes = True, show_layer_names = True)

Definisco la funzione di costo che l'addestramento minimizzerà il più possibile.

In [None]:
mse_loss = keras.losses.MeanSquaredError(
    reduction = "sum_over_batch_size",
    name = "mean_squared_error"
)

Compilo la rete.

In [None]:
model.compile(
    loss = [mse_loss, mse_loss],
    optimizer = keras.optimizers.AdamW(learning_rate = learning_rate),
)

Definisco una callback EarlyStopping. Essa valuta alla fine di ogni epoca la funzione di costo sul validation set e decide se fermare l'addestramento oppure no. Utile per stabilire automaticamente un buon numero di epoche di addestramento della rete.

In [None]:
early_stopping = keras.callbacks.EarlyStopping(
    monitor = 'val_loss',   # Monitora la loss sulla metrica indicata
    min_delta = 0.001,      # Variazione minima da considerare come miglioramento
    patience = early_stopping_patience,          # Numero di epoche senza miglioramenti prima di fermare l'addestramento
    mode = 'auto',          # oppure prende "min", "max", seleziona la direzione in automatico
    restore_best_weights = True,  # Ripristina i pesi migliori quando l'addestramento si ferma
    start_from_epoch = 5    # Inizia il monitoraggio dall'epoca 5
)

Definisco una callback "BackupAndRestore".  
Alla fine di ogni epoca di addestramento, essa salva la rete in un file di backup temporaneo.  
Se il notebook dovesse bloccarsi a tempo di esecuzione, sarà possibile riavviare l'addestramento ripristinando l'ultimo stato salvato nel file di backup.

In [None]:
backup_and_restore = keras.callbacks.BackupAndRestore(backup_dir = "/backup")

Definisco una callback per il controllo del learning rate.

In [None]:
reduce_lr = keras.callbacks.ReduceLROnPlateau(
    monitor = 'val_loss',
    min_delta = 0.001,
    factor = 0.2,
    patience = reduce_lr_patience,
    min_lr = 0.001
)

Addestro la rete.

In [None]:
history = model.fit(x_train, y_train, batch_size = batch_size, epochs = epochs, validation_data = (x_val, y_val), callbacks = [early_stopping, backup_and_restore, reduce_lr])

Ora definisco una funzione per la creazione di grafici della storia del training e visualizzo due grafici che la mostrano.

In [None]:
print(history.history.keys())

def plot_training_history(history):

    # Estrai la loss di training e le due accuracy di validation, una per ogni output
    training_loss = history.history['loss']
    validation_loss = history.history['val_loss']

    # Crea un grafico
    epochs = range(1, len(training_loss) + 1)
    plt.figure(figsize=(12, 4))

    plt.plot(epochs, training_loss, label='Training Loss')
    plt.plot(epochs, validation_loss, label='Validation Loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    plt.tight_layout()
    plt.show()

plot_training_history(history)

Definisco due funzioni per la valutazione finale del modello.

In [None]:
def eval_model(model):
  x, (y1, y2) = next(testgen)
  if isinstance(model, keras.Model):
    pred1, pred2 = model.predict(x)
  else:
    pred1, pred2 = model(x)

  return (np.mean((pred1-y1)**2) + np.mean((pred2-y2)**2) / 2)

def multiple_eval_model(model, repeat_eval = 10):
  eval_results = []
  for i in range(repeat_eval):
    eval_results.append(eval_model(model))
  print("mse mean = ", np.mean(eval_results))
  print("mse standard deviation = ", np.std(eval_results))

Valuto la rete e il modello casuale e confronto le loro prestazioni.  
Più questi valori sono vicini a zero, più la rete è accurata.

In [None]:
print('Valutazione modello banale')
multiple_eval_model(ide_model)

print('Valutazione rete neurale')
multiple_eval_model(model)

Mostriamo alcuni esempi

In [None]:
onegen = datagenerator(mnist_x_test, fashion_mnist_x_test, 1)

def show_images(x, y1, y2):
    fig, ax = plt.subplots(1, 3, figsize=(12,4))
    ax[0].imshow(x,cmap='gray')
    ax[0].title.set_text('Input')
    ax[0].axis('off')
    ax[1].imshow(y1,cmap='gray')
    ax[1].title.set_text('mnist')
    ax[1].axis('off')
    ax[2].imshow(y2,cmap='gray')
    ax[2].title.set_text('fashion_mnist')
    ax[2].axis('off')
    plt.show()

print('Primo campione di esempio')
x, (y1, y2) = next(onegen)
show_images(x[0], y1[0], y2[0])

print('Previsione del modello banale')
y1_pred, y2_pred = ide_model(x)
show_images(x[0], y1_pred[0], y2_pred[0])

print('Previsione della rete neurale')
y1_pred, y2_pred = model.predict(x)
show_images(x[0], y1_pred[0], y2_pred[0])

print()
print()

print('Secondo campione di esempio')
x, (y1, y2) = next(onegen)
show_images(x[0], y1[0], y2[0])

print('Previsione del modello banale')
y1_pred, y2_pred = ide_model(x)
show_images(x[0], y1_pred[0], y2_pred[0])

print('Previsione della rete neurale')
y1_pred, y2_pred = model.predict(x)
show_images(x[0], y1_pred[0], y2_pred[0])

print()
print()

print('Terzo campione di esempio')
x, (y1, y2) = next(onegen)
show_images(x[0], y1[0], y2[0])

print('Previsione del modello banale')
y1_pred, y2_pred = ide_model(x)
show_images(x[0], y1_pred[0], y2_pred[0])

print('Previsione della rete neurale')
y1_pred, y2_pred = model.predict(x)
show_images(x[0], y1_pred[0], y2_pred[0])