# Extended MNIST

L'obiettivo del progetto è il riconoscimento e la conseguente classificazione di immagini rappresentati lettere scritte a mano. Il dataset fornito è composto da 80000 immagini di stessa dimensione (28x28), rappresentate in scala di grigi. Non viene fatta distizione tra lettere maiuscole e minuscole, quindi le classi da individuare sono 26.

# Approccio alla soluzione

Verranno utilizzati più modelli per la soluzione al problema, partendo da un modello di base (Logistic Regression) fino ad arrivare a modelli più complessi (Neural Network). Le prestazioni di ogni modello verrano giudicate in base all'accuratezza ed al tempo di esecuzione.  

In [2]:
#import gc

import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import keras_tuner as kt

from time import time
from sklearn.model_selection import train_test_split
from sklearn.model_selection import PredefinedSplit
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, ConfusionMatrixDisplay
from sklearn.ensemble import RandomForestClassifier

In [None]:
#utility functions

#time monitor
def elapsed_time(start, end):
    hours, rem = divmod(end-start, 3600)
    minutes, seconds = divmod(rem, 60)
    print("Time spent training: {:0>2}:{:0>2}:{:0>2}".format(int(hours),int(minutes),int(seconds)))

#accuracy printer
def print_accuracy_scores(train_labels, test_labels, predictions_train_model, predictions_test_model):
    acc_train = accuracy_score(train_labels, predictions_train_model)
    acc_test = accuracy_score(test_labels, predictions_test_model)
    print('Training set accuracy:   {:.3f}'.format(acc_train))
    print('Test set accuracy:       {:.3f}'.format(acc_test))

def plot_measure(history_train, history_test, title='', xticks=None):
    plt.figure(figsize=(12, 8))
    plt.plot(history_train, marker='o', markersize=5, label='Train')
    plt.plot(history_test, marker='o', markersize=5, label='Validation')
    plt.legend()
    if xticks is None:
        plt.gca().xaxis.set_major_locator(plt.NullLocator())
    else:
        #plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True))
        ax = plt.subplot(111)
        ax.set_xlim(xticks[0], xticks[-1])
        plt.xticks(xticks)
    plt.title(title)
    plt.show()
    
#accuracy printer
def plot_accuracy(history_df):
    plot_measure(history_df.accuracy, history_df.val_accuracy, 'Accuracy')
    
#loss printer
def plot_loss(history_df):
    plot_measure(history_df.loss, history_df.val_loss, 'Loss')

#confusion matrix printer
def plot_confusion_matrix(train_labels, predictions_train):
    plt.figure(figsize=(12,6))
    disp = ConfusionMatrixDisplay.from_predictions(train_labels, predictions_train, normalize='true', cmap='Blues')
    disp.figure_.suptitle("Confusion Matrix")
    plt.grid(None)
    plt.show()

I dati sono stati divisi in 3 set: train (70%), validation (15%) e test set(15%).

In [6]:
#reading data from csv and split the data in train, validation and test sets

df = pd.read_csv("emnist-letters.csv")
X = df.iloc[:,1:].to_numpy().reshape(-1, 28, 28, order="F")
y = df.iloc[:,0].to_numpy()-1
print(y)

train_images, test_val_images, train_labels, test_val_labels = train_test_split(X,y, test_size=0.3, random_state=42)
val_images, test_images, val_labels, test_labels = train_test_split(test_val_images, test_val_labels, test_size=0.5, random_state=42)

X = np.concatenate((train_images, val_images), axis=0)
y = np.concatenate((train_labels, val_labels), axis=0)
split_index = [-1]*len(train_images) + [0]*len(val_images)
pds = PredefinedSplit(test_fold = split_index)


[ 6 15 14 ...  0 22 11]


# Softmax Regression

Prima di applicare ogni modello, attraverso una GridSearch si individua la migliore combinazione dei parametri del modello scelto. A causa dei tempi di esecuzioni troppo elevati, non è stato possibile usare una cross-validation, ma una semplice "1fold".

Nel caso della Softmax Regression, i paramteri soggetti a tuning sono:
 - C (il fattore di regolarizzazione)
  - max_iter (il numero massimo di iterazioni del solver).

In [None]:
params = {"C":[0.1, 0.01, 0.001],
         "max_iter":[100, 200, 500, 1000]}
clf = GridSearchCV(LogisticRegression(multi_class='multinomial', n_jobs=-1), params, scoring='accuracy', return_train_score=True, cv=pds)

time_start = time()

clf.fit(X, y)

time_end = time()
elapsed_time(time_start, time_end)

In [None]:
clf.best_estimator_

In [None]:
clf.best_score_

In [None]:
log_reg = LogisticRegression(multi_class='multinomial', n_jobs=-1, C=0.01, max_iter=1000)
time_start = time()
log_reg.fit(train_images, train_labels)
time_end = time()
elapsed_time(time_start, time_end)

predictions_test = log_reg.predict(test_images)
predictions_train = log_reg.predict(train_images)

print_accuracy_scores(train_labels, test_labels, predictions_train, predictions_test)
plot_confusion_matrix(train_labels, predictions_train)

# Random forest

Nella Random Forest, i paramteri soggetti a tuning sono:
- n_estimators (il numero di alberi)
- criterion (funzione di impurità)
- min_samples_split (il numeri minimo di elementi di un nodo affinché possa essere partizionato)
- max_depth (profondità dell'albero).

La suddivisione del dataset in train, validatione test set rimane la stessa: 70% train set, 15% test set e 15% validation set.


In [None]:
n_estimators =[50, 100, 200, 500, 1000] 
criterion = ["gini", "entropy"]
min_samples_split = [2, 4, 8, 16, 32, 64, 128, 256, 1024, 2048]
max_depth = [1, 5, 10, 20, 45, 50]

def build_random_forest(hp):
    model = RandomForestClassifier(
        n_jobs=-1, 
        random_state=42,
        n_estimators=hp.Choice("n_estimators", n_estimators),
        criterion=hp.Choice("criterion", criterion),
        min_samples_split=hp.Choice("min_samples_split", min_samples_split),
        max_depth=hp.Choice('max_depth', max_depth))
    return model

In [None]:
rf_trials=80
tuner = kt.tuners.SklearnTuner(
    oracle=kt.oracles.RandomSearchOracle(objective=kt.Objective('score', 'max'),max_trials=rf_trials, seed=42),
    scoring='accuracy',
    hypermodel= build_random_forest,
    cv=pds,
    project_name='tuners/random_forest')

tuner.search(X, y)
#gc.collect()
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print("\nBest criterion:        ", best_hps.get("criterion"))
print("Best max_depth:          ", best_hps.get("max_depth"))
print("Best n_estimators:       ", best_hps.get("n_estimators"))
print("Best min_samples_split:  ", best_hps.get("min_samples_split"))

In [None]:
model = RandomForestClassifier(n_jobs=-1, criterion=best_hps.get("criterion"), 
                               max_depth=best_hps.get("max_depth"), n_estimators=best_hps.get("n_estimators"), min_samples_split=best_hps.get("min_samples_split"))
time_start = time()
model.fit(train_images, train_labels)
time_end = time()
elapsed_time(time_start, time_end) 
print("")

predictions_test = model.predict(test_images)
predictions_train = model.predict(train_images)

print_accuracy_scores(train_labels, test_labels, predictions_train, predictions_test)
plot_confusion_matrix(train_labels, predictions_train)

# Reti neurali convoluzionali (CNN)

Come ultima tipologia di modello si è scelto di utilizzare una CNN. Verranno analizzati 3 modelli di complessità crescente,  ed ad ogni di essi verrà applicato il tuning degli iperparametri.

In [None]:
#hyperparameters
dense_units = [32, 64, 128, 256]
l2 = [0.01, 0.001, 0.0001]
dropouts_rate = [.20, .30, .40, .50, .60, .70, .80]
filters = [[16,32,64], [32, 64, 128]]
filter = [16, 32, 64, 128, 256]

# Modello 1

Questo modello rappresenta la base per i modelli successivi più complessi. Esso non presenta un layer per la convoluzione, quindi non è classificabile come CNN, bensì abbiamo:
- un layer per il rescaling delle immagini
- un layer per applicare il flatten dei dati
- un layer su cui si applica il tuning per il numero di unità con funzione di attivazione relu
- un layer di output, che sarà lo stesso per tutti i modelli, con 26 nodi e la softmax come funzione di attivazione

In [None]:
def model1_builder(hp): 
    model1 = tf.keras.Sequential()
    model1.add(tf.keras.layers.Rescaling(1./255, input_shape=(28, 28)))
    model1.add(tf.keras.layers.Flatten())
    model1.add(tf.keras.layers.Dense(hp.Choice("dense_units", dense_units), activation="relu"))
    model1.add(tf.keras.layers.Dense(26, activation="softmax"))
    
    model1.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    return model1

In [None]:
tuner = kt.GridSearch(model1_builder, kt.Objective("val_acc", direction="max"), project_name='tuners/nn1')
tuner.search(train_images, train_labels, epochs=20, validation_data=(val_images, val_labels), callbacks=[tf.keras.callbacks.EarlyStopping(monitor="val_acc", mode="max", patience=5)], batch_size=128, use_multiprocessing=True)
best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]
#gc.collect()

In [None]:
print("Best Dense units: ", best_hps.get("dense_units"))

In [None]:
model1 = tuner.hypermodel.build(best_hps)
model1.summary()

In [None]:
time_start = time()
#model_file = "./best_model/model0.ckpt"
#checkpoint = tf.keras.callbacks.ModelCheckpoint(model_file, monitor="val_acc", mode="max", save_weights_only=True, save_best_only=True, verbose=1) -- sarebbe da usare in callbacks quando fitto il modello per salvare il miglior modello trainato fino a quel momento
history_model0 = model1.fit(train_images, train_labels, batch_size=128, epochs=20, verbose=1, validation_data=(val_images, val_labels), callbacks=[tf.keras.callbacks.EarlyStopping(monitor="val_acc", mode="max", patience=5)])
time_end = time()
elapsed_time(time_start, time_end)
#gc.collect()

In [None]:
history_model1_df = pd.DataFrame(history_model0.history)
plot_loss(history_model1_df)
plot_accuracy(history_model1_df)

In [None]:
predictions_train_model1 = np.argmax(model1.predict(train_images), axis=-1)
predictions_test_model1 = np.argmax(model1.predict(test_images), axis=-1)

In [None]:
plot_confusion_matrix(train_labels, predictions_train_model1)
plot_confusion_matrix(test_labels, predictions_test_model1)

In [None]:
print_accuracy_scores(train_labels, test_labels, predictions_train_model1, predictions_test_model1)

# Modello 2

Questo modello rappresenta la prima rete CNN. Rispetto al modello 0 vengono aggiunti:
- un layer convoluzionale (viene applicato il tuning sul filtro del layer)
- un fattore di regoralizzazione L2 da apliccare ai layer di Dense

In [None]:
def model2_builder(hp): 
    k = hp.Choice("L2", l2)
    model2 = tf.keras.Sequential()
    model2.add(tf.keras.layers.Rescaling(1./255, input_shape=(28, 28)))
    model2.add(tf.keras.layers.Conv2D(hp.Choice("filter", filter), 3, padding="same", activation="relu"))
    model2.add(tf.keras.layers.MaxPooling2D())
    model2.add(tf.keras.layers.Flatten())
    model2.add(tf.keras.layers.Dense(hp.Choice("dense_units", dense_units), activation="relu", kernel_regularizer=tf.keras.regularizers.L2(k)))
    model2.add(tf.keras.layers.Dense(26, activation="softmax", kernel_regularizer=tf.keras.regularizers.L2(k)))
    
    model2.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

    return model2

#gc.collect()

In [None]:
#tuning
tuner = kt.RandomSearch(model2_builder, kt.Objective("val_acc", direction="max"), 10, 42, project_name='tuners/nn2')
tuner.search(train_images, train_labels, epochs=20, validation_data=(val_images, val_labels), callbacks=[tf.keras.callbacks.EarlyStopping(monitor="val_acc", mode="max", patience=5)], batch_size=128, use_multiprocessing=True)
best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]
#gc.collect()

In [None]:
print("Best Dense units: ", best_hps.get("dense_units"))
print("Best L2 value: ", best_hps.get("L2"))
print("Best Filter: ", best_hps.get("filter"))

In [None]:
model2 = tuner.hypermodel.build(best_hps)
model2.summary()

In [None]:
#model_file = "./best_model/model2.ckpt"
#checkpoint = tf.keras.callbacks.ModelCheckpoint(model_file, monitor="val_acc", mode="max", save_weights_only=True, save_best_only=True, verbose=1)
history_model2 = model2.fit(train_images, train_labels, batch_size=128, epochs=20, verbose=1, validation_data=(val_images, val_labels), callbacks=[tf.keras.callbacks.EarlyStopping(monitor="val_acc", mode="max", patience=5)])
#gc.collect()

In [None]:
history_model2_df = pd.DataFrame(history_model2.history)
plot_loss(history_model2_df)
plot_accuracy(history_model2_df)

In [None]:
predictions_train_model2 = np.argmax(model2.predict(train_images), axis=-1)
predictions_test_model2 = np.argmax(model2.predict(test_images), axis=-1)
#gc.collect()

In [None]:
plot_confusion_matrix(train_labels, predictions_train_model2)
plot_confusion_matrix(test_labels, predictions_test_model2)

In [None]:
print_accuracy_scores(train_labels, test_labels, predictions_train_model2, predictions_test_model2)

# Modello 3

Per il terzo modello, viene introdotto un ulteriore layer convoluzionale e un layer di Dropout per marginare l'overfitting. Si hanno due set di filtri separati da applicare ai layer convoluzionali, e i valori per essi vengono scelti attraverso il tuning. Anche il rate di dropout è soggetto a tuning.

In [None]:
def model3_builder(hp): 
    filters_index = hp.Choice("filters_index", [0, 1])
    k = hp.Choice("L2", l2)
    model3 = tf.keras.Sequential()
    model3.add(tf.keras.layers.Rescaling(1./255, input_shape=(28,28)))
    model3.add(tf.keras.layers.Conv2D(filters[filters_index][0], 3, padding="same", activation="relu"))
    model3.add(tf.keras.layers.MaxPooling2D())
    model3.add(tf.keras.layers.Conv2D(filters[filters_index][1], 3, padding="same", activation="relu"))
    model3.add(tf.keras.layers.MaxPooling2D())
    model3.add(tf.keras.layers.Dropout(hp.Choice("dropout_rate", dropouts_rate)))
    model3.add(tf.keras.layers.Flatten())
    model3.add(tf.keras.layers.Dense(hp.Choice("dense_units", dense_units), activation="relu", kernel_regularizer=tf.keras.regularizers.L2(k)))
    model3.add(tf.keras.layers.Dense(26, activation="softmax", kernel_regularizer=tf.keras.regularizers.L2(k)))
    
    model3.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

    return model3

In [None]:
tuner = kt.RandomSearch(model3_builder, kt.Objective("val_acc", direction="max"), 10, 42, project_name='tuners/nn3')
tuner.search(train_images, train_labels, epochs=20, validation_data=(val_images, val_labels), callbacks=[tf.keras.callbacks.EarlyStopping(monitor="val_acc", mode="max", patience=5)], batch_size=128, use_multiprocessing=True)
best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]
#gc.collect()

In [None]:
print("Best Filter: ", filters[best_hps.get("filters_index")])
print("Best Dropout Rate: ", best_hps.get("dropout_rate"))
print("Best Dense units: ", best_hps.get("dense_units"))
print("Best L2 value: ",  best_hps.get("L2"))

In [None]:
model3 = tuner.hypermodel.build(best_hps)
model3.summary()
#gc.collect()

In [None]:
#model_file = "./best_model/model3.ckpt"
#checkpoint = tf.keras.callbacks.ModelCheckpoint(model_file, monitor="val_acc", mode="max", save_weights_only=True, save_best_only=True, verbose=1)
history_model3 = model3.fit(train_images, train_labels, batch_size=128, epochs=20, verbose=1, validation_data=(val_images, val_labels), callbacks=[tf.keras.callbacks.EarlyStopping(monitor="val_acc", mode="max", patience=5)])
#gc.collect()

In [None]:
history_model3_df = pd.DataFrame(history_model3.history)
plot_loss(history_model3_df)
plot_accuracy(history_model3_df)

In [None]:
predictions_train_model3 = np.argmax(model3.predict(train_images), axis=-1)
predictions_test_model3 = np.argmax(model3.predict(test_images), axis=-1)
#gc.collect()

In [None]:
plot_confusion_matrix(train_labels, predictions_train_model3)
plot_confusion_matrix(test_labels, predictions_test_model3)

In [None]:
print_accuracy_scores(train_labels, test_labels, predictions_train_model3, predictions_test_model3)