# Hands-on MNIST using Machine Learning and Deep Learning

# 1. Data

## Data Exploration

In [None]:
from sklearn.datasets import fetch_openml
import matplotlib.pyplot as plt

In [None]:
mnist = fetch_openml('mnist_784', version=1)
X, y = mnist['data'], mnist['target']

print(X.shape, y.shape)

In [None]:
X.head()

In [None]:
X.describe()

In [None]:
print('Unique Labels:')
y.value_counts()

In [None]:
print('Label-Balance (in %):')
y.value_counts()/len(y)*100

In [None]:
digit_index = 0

digit = X.values[digit_index].reshape(28,28)
plt.imshow(digit, cmap='binary')
plt.show()

print('Label:', y[digit_index])

## Data Preprocessing

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

In [None]:
def normalize(data):
    return data/255 # Alternatively, you can use sklearn's MinMax Scaler. For other feature scaling tasks, you would normally use StandardScaler

X_train_normalized = normalize(X_train)
X_test_normalized = normalize(X_test)

# 2. Machine Learning

In [None]:
from tqdm import tqdm
from sklearn.model_selection import cross_val_score
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

## Model Selection using Cross-Validation

In [None]:
# Just a few Classifier, add more if you want!
estimators = [
    DecisionTreeClassifier(), 
    RandomForestClassifier()
]

In [None]:
CV = 5 # Fold
SCORING = 'f1_macro' # Validation Metric (others: accuracy, precision, recall, ...)

scores = [cross_val_score(estimator=estimator, X=X_train_normalized, y=y_train, cv=CV, scoring=SCORING).mean() for estimator in tqdm(estimators) ]
best_estimator = estimators[scores.index(max(scores))]
best_score = max(scores)

print(best_estimator, best_score)

## Hyperparameter-Tuning using GridSearchCV

In [None]:
PARAM_GRID = ''

# Example Parameter Grid for a Random Forest Classifier and a Decicion Tree
if type(best_estimator) == type(RandomForestClassifier()):
    PARAM_GRID = [
        {'n_estimators':[3,10,30,100], 'max_features': [2,4]},
        {'bootstrap':[False], 'n_estimators':[3,10,30,100], 'max_features': [2,4]}
    ]

elif type(best_estimator) == type(DecisionTreeClassifier()): 
    PARAM_GRID = [
        {'criterion': ['gini', 'entropy'], 'max_depth': [2,4,6,8,10,12]}
    ]

estimator_cv = GridSearchCV(estimator=best_estimator, param_grid=PARAM_GRID, scoring=SCORING, cv=CV)
search_result = estimator_cv.fit(X=X_train_normalized, y=y_train)

print(search_result.best_estimator_, search_result.best_score_)

## Validate final Model based on Test-Set

In [None]:
final_ML_model = search_result.best_estimator_
final_predictions = final_ML_model.predict(X_test_normalized)

In [None]:
digit_index = 5

digit = X_test_normalized.values[digit_index].reshape(28,28)
plt.imshow(digit, cmap='binary')
plt.show()

print('Label:', y_test.values[digit_index])
print('Prediction:', final_predictions[digit_index])

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import seaborn as sns

In [None]:
accuracy_score(final_predictions, y_test)

In [None]:
classification_report(final_predictions, y_test)

In [None]:
matrix = confusion_matrix(final_predictions, y_test)
ax = sns.heatmap(matrix, annot=True, fmt='d', cmap='Blues')
ax.set_xlabel('Predicted Label')
ax.set_ylabel('True Label')
ax.set_title('Confusion Matrix')

In [None]:
'''
NOTE: Over- and Underfitting
Score of cross validation on train is good but not on test set: Model probably overfits the data 
Solution: Try with a less complex model to prevent overfitting

Score of cross validation on train AND test set is bad: Model probably underfits/generalizes the data OR data quality is bad
Solution: Try with a more complex model AND/OR collect more data/features
'''

## Save Model

In [None]:
import joblib
from datetime import datetime

In [None]:
now = str(datetime.now().strftime("%Y-%m-%d_%H:%M"))
filename = 'ML_model-{now}.pkl'.format(**locals())

joblib.dump(final_ML_model, 'models/machine_learning/'+filename)

# 3. Neural Networks (Deep Learning)

In [None]:
import numpy as np
from sklearn.model_selection import RandomizedSearchCV
from keras.wrappers.scikit_learn import KerasClassifier
from keras.models import Sequential
from keras.layers import InputLayer, Dense, Dropout, Conv2D, MaxPooling2D, Flatten
import tensorflow as tf

## Neural Network with Fully-Connected (FC) Layers

In [None]:
INPUT_SHAPE = [X_train_normalized.shape[1]] # Number of values per dimension (Number of Features)
OUTPUT_NEURONS = len(y.unique()) # Number of output neurons / label classes

In [None]:
def build_model(n_hidden=4, n_neurons=8, dropout_rate=0.5):
    model = Sequential()
    model.add(InputLayer(input_shape=INPUT_SHAPE))
    model.add(Dropout(rate=dropout_rate))
    for layer in range(n_hidden):
        model.add(Dense(n_neurons, activation='relu'))
        model.add(Dropout(rate=dropout_rate))
    model.add(Dense(OUTPUT_NEURONS, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) # Note: Using categorical_crossentropy for (any) classification task (you can also use activation='sigmoid' and loss='binary_crossentropy' for binary classification)
    return model

FC_neural_net = KerasClassifier(build_fn=build_model)

In [None]:
param_grid = {
    'n_hidden': [2,4,6],
    'n_neurons': [50,100,150,200,300,400],
    'dropout_rate': [0.0,0.2,0.5,0.7]
}

N_ITER = 5
EPOCHS = 50
CV = 5

# Note: GridSearchCV would take a long time. Therefore, we use RandomizedSearchCV to tune the Hyperparameters of the Neural Net. However, my experiences show, that 2 hidden layers, 150  neurons and a dropout of 0.2 do a great job (98% Accuracy)
FC_neural_net_cv = RandomizedSearchCV(estimator=FC_neural_net, param_distributions=param_grid, n_iter=N_ITER, cv=CV)

early_stopping = tf.keras.callbacks.EarlyStopping(patience=5) # Monitor Validation Loss and stop Training when Validation Loss starts increasing the <patience> time/number of epochs
FC_neural_net_cv_result = FC_neural_net_cv.fit(X=X_train_normalized, y=y_train, validation_split=0.1, callbacks=[early_stopping], epochs=EPOCHS, verbose=2)

In [None]:
final_FC_model = FC_neural_net_cv_result.best_estimator_
print(FC_neural_net_cv_result.best_estimator_, FC_neural_net_cv_result.best_params_, FC_neural_net_cv_result.best_score_)

In [None]:
FC_final_predictions = final_FC_model.predict(X_test_normalized)

In [None]:
digit_index = 5

digit = X_test_normalized.values[digit_index].reshape(28,28)
plt.imshow(digit, cmap='binary')
plt.show()

print('Label:', y_test.values[digit_index])
print('Prediction:', FC_final_predictions[digit_index])

### Validate final Model based on Test-Set

In [None]:
accuracy_score(y_pred=FC_final_predictions, y_true=y_test)

In [None]:
classification_report(y_pred=FC_final_predictions, y_true=y_test)

In [None]:
matrix = confusion_matrix(FC_final_predictions, y_test)
ax = sns.heatmap(matrix, annot=True, fmt='d', cmap='Blues')
ax.set_xlabel('Predicted Label')
ax.set_ylabel('True Label')
ax.set_title('Confusion Matrix')

## Save Model

In [None]:
now = str(datetime.now().strftime("%Y-%m-%d_%H:%M"))
filename = 'FC-{now}'.format(**locals())

# Saving Architecture
open('models/fully_connected/'+filename+'.json', 'w').write(final_FC_model.model.to_json())
# Saving Weights
final_FC_model.model.save_weights('models/fully_connected/'+filename+'.h5', overwrite=True)

## Convolutional Neural Network (CNN)

In [None]:
X_train_normalized_reshaped = [a.reshape(28,28,1) for a in X_train_normalized.values]
X_train_normalized_reshaped = np.asarray(X_train_normalized_reshaped)

X_test_normalized_reshaped = [a.reshape(28,28,1) for a in X_test_normalized.values]
X_test_normalized_reshaped = np.asarray(X_test_normalized_reshaped)

INPUT_SHAPE = X_train_normalized_reshaped[0].shape

In [None]:
def build_cnn(n_neurons_dense=100, kernel_size=(3,3), filter_size=32, pooling_size=(2,2), dropout_rate=0.5):
    model = Sequential()
    model.add(Conv2D(filter_size, kernel_size=kernel_size, activation='relu', input_shape=INPUT_SHAPE))
    model.add(MaxPooling2D(pool_size=pooling_size))

    model.add(Conv2D(filter_size, kernel_size=kernel_size, activation='relu'))
    model.add(MaxPooling2D(pool_size=pooling_size))
    model.add(Dropout(rate=dropout_rate))
    
    model.add(Flatten())
    model.add(Dense(n_neurons_dense, activation='relu'))
    model.add(Dropout(rate=dropout_rate))
    model.add(Dense(OUTPUT_NEURONS, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

CNN_neural_net = KerasClassifier(build_fn=build_cnn)

In [None]:
param_grid = {
    'n_neurons_dense': [50,100,150,200,300,400],
    'kernel_size': [(2,2),(3,3),(4,4)],
    'filter_size': [10,32,50,100,200],
    'pooling_size': [(2,2),(3,3),(4,4)],
    'dropout_rate': [0.0,0.2,0.5,0.7]
}

N_ITER = 5
EPOCHS = 50
CV = 5

# Note: GridSearchCV would take a long time. Therefore, we use RandomizedSearchCV to tune the Hyperparameters of the CNN. However, my experiences show, that 'pooling_size': (2, 2), 'n_neurons_dense': 300, 'n_hidden': 6, 'kernel_size': (3, 3), 'filter_size': 50, 'dropout_rate': 0.5 do a great job (99,11% Accuracy)
CNN_neural_net_cv = RandomizedSearchCV(estimator=CNN_neural_net, param_distributions=param_grid, n_iter=N_ITER, cv=CV)

early_stopping = tf.keras.callbacks.EarlyStopping(patience=5) # Monitor Validation Loss and stop Training when Validation Loss starts increasing the <patience> time/number of epochs
CNN_neural_net_cv_result = CNN_neural_net_cv.fit(X=X_train_normalized_reshaped, y=y_train, validation_split=0.1, callbacks=[early_stopping], epochs=EPOCHS, verbose=2)

In [None]:
CNN_final_model = CNN_neural_net_cv_result.best_estimator_
print(CNN_neural_net_cv_result.best_estimator_, CNN_neural_net_cv_result.best_params_, CNN_neural_net_cv_result.best_score_)

In [None]:
CNN_final_predictions = CNN_final_model.predict(X_test_normalized_reshaped)

In [None]:
digit_index = 5

digit = X_test_normalized_reshaped[digit_index]
plt.imshow(digit, cmap='binary')
plt.show()

print('Label:', y_test.values[digit_index])
print('Prediction:', CNN_final_predictions[digit_index])

### Validate final Model based on Test-Set

In [None]:
accuracy_score(y_pred=CNN_final_predictions, y_true=y_test)

In [None]:
classification_report(y_pred=CNN_final_predictions, y_true=y_test)

In [None]:
matrix = confusion_matrix(CNN_final_predictions, y_test)
ax = sns.heatmap(matrix, annot=True, fmt='d', cmap='Blues')
ax.set_xlabel('Predicted Label')
ax.set_ylabel('True Label')
ax.set_title('Confusion Matrix')

## Save Model

In [None]:
now = str(datetime.now().strftime("%Y-%m-%d_%H:%M"))
filename = 'CNN-{now}'.format(**locals())

# Saving Architecture
open('models/cnn/'+filename+'.json', 'w').write(CNN_final_model.model.to_json())
# Saving Weights
CNN_final_model.model.save_weights('models/cnn/'+filename+'.h5', overwrite=True)

# Predict new Images

## Load latest Models

In [None]:
from glob import glob
import os
import joblib
from keras.models import model_from_json

In [None]:
def get_latest_models():
    # Load latest ML Model
    MLmodels_all = glob('models/machine_learning/*pkl')
    MLmodel = joblib.load(max(MLmodels_all, key=os.path.getctime))

    # Load latest Fully Connected Neural Network
    PATH = 'models/fully_connected/'
    FC_architectures = glob(PATH + '*json') # Latest architectures
    FC_weights= glob(PATH+'*h5') # Latest weights
    FCmodel = model_from_json(open(max(FC_architectures, key=os.path.getctime)).read())
    FCmodel.load_weights(max(FC_weights, key=os.path.getctime))
    FCmodel.compile(optimizer='adam', loss='categorical_crossentropy')

    # Load latest CNN
    PATH = 'models/cnn/'
    CNN_architectures = glob(PATH + '*json') # Latest architectures
    CNN_weights= glob(PATH+'*h5') # Latest weights
    CNNmodel = model_from_json(open(max(CNN_architectures, key=os.path.getctime)).read())
    CNNmodel.load_weights(max(CNN_weights, key=os.path.getctime))
    CNNmodel.compile(optimizer='adam', loss='categorical_crossentropy')

    return MLmodel, FCmodel, CNNmodel

final_ML_model, final_FC_model, final_CNN_model = get_latest_models()

## Prepare new Image

In [None]:
import cv2

In [None]:
def prepare_new_image(file_path, print_digit=True):
    digit_array = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
    digit_array = cv2.bitwise_not(digit_array)
    digit_array_resized = cv2.resize(digit_array, (28,28))

    digit_reshaped = digit_array_resized.reshape(784)
    digit_normalized = normalize(digit_reshaped)

    if print_digit:
        _, (ax1, ax2) = plt.subplots(1, 2)
        ax1.imshow(digit_array, cmap=plt.cm.binary)
        ax1.set_title('Original')
        ax2.imshow(digit_array_resized, cmap=plt.cm.binary)
        ax2.set_title('Resized')

    return digit_normalized

digit_normalized = prepare_new_image('digits/3.png', print_digit=True)

## Predict

In [None]:
# Prediction using trained Machine Learning Model
ML_predicted_instance = final_ML_model.predict_proba([digit_normalized])
print('ML Model (', type(final_ML_model), '):')
print('Predicted Class:', int(np.argmax(ML_predicted_instance, axis=1)))
print('Probabilities:', ML_predicted_instance)

# Prediction using trained Neural Network
FC_predicted_instance = final_FC_model.predict([digit_normalized.tolist()])
print('Fully Connected Neural Network:')
print('Predicted Class', int(np.argmax(FC_predicted_instance, axis=1)))
print('Probabilities:', FC_predicted_instance)

# Prediction using trained CNN
CNN_predicted_instance = final_CNN_model.predict([digit_normalized.reshape(28,28,1).tolist()])
print('Convolutional Neural Network:')
print('Predicted Class', int(np.argmax(CNN_predicted_instance, axis=1)))
print('Probabilities:', CNN_predicted_instance)