## Init

### Imports

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import os
import pathlib
import sys


### Paths


In [None]:
data_dir = "../input/digit-recognizer"
is_kaggle = os.path.isdir(data_dir)
if is_kaggle:
    ROOT_DIR = pathlib.Path("..")
    DATA_DIR = ROOT_DIR / "input/digit-recognizer"
    SUBMISSION_PATH = ROOT_DIR / "submissions.csv"
    MODELS_DIR = ROOT_DIR 
    
else:
    ROOT_DIR = pathlib.Path(".")
    SUBMISSION_PATH = "./submissions/mnist.csv"
    MODELS_DIR = ROOT_DIR / "models"

## Data

### Load MNIST data

In [None]:
def load_data(use_extra_data=False) -> tuple[tuple, tuple]:
    if is_kaggle:
        input_data = pd.read_csv(DATA_DIR / "train.csv")
        input_X = input_data.drop("label", axis=1).values
        input_X = input_X.reshape(-1, 28, 28)
        input_labels = input_data["label"].values
        input_data = (input_X, input_labels)
        
        final_test_data = pd.read_csv(DATA_DIR / "test.csv")
        final_test_X = final_test_data.values
        final_test_X = final_test_X.reshape(-1, 28, 28)
        final_test_data = (final_test_X, np.array([0]))
        
        if use_extra_data:
            (train_X_tf, train_labels_tf), (test_X_tf, test_labels_tf) = tf.keras.datasets.mnist.load_data()
            
            input_X = np.concatenate([input_X, train_X_tf, test_X_tf])
            input_labels = np.concatenate([input_labels, train_labels_tf, test_labels_tf])
            
    else:
        input_data, final_test_data = tf.keras.datasets.mnist.load_data()
        
    return input_data, final_test_data

(train_X, train_y), (test_X, test_y) = load_data()

data_size = sum(map(sys.getsizeof, [train_X, test_X])) // 1024 ** 2
print("Size of loaded data - ",data_size, "MB")

### Data preprocessing

In [None]:
from tensorflow.keras.utils import to_categorical

#reshape
train_X = train_X.reshape(*train_X.shape[:3], 1)
test_X = test_X.reshape(*test_X.shape[:3], 1)

# OHE labels
train_y_ohe = to_categorical(train_y)
test_y_ohe = to_categorical(test_y)

train_X.shape, train_y_ohe.shape, test_X.shape, test_y_ohe.shape

#### Image Augmentation

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

train_image_genetator = ImageDataGenerator(
    rescale=1./255,
    
    height_shift_range=0.1,
    width_shift_range=0.1,
    zoom_range=0.1,
    rotation_range=1,
    shear_range=0.1,
    brightness_range=[0.1, 1.1],
    validation_split=0.2
)

val_image_genetator = ImageDataGenerator(
    rescale=1./255,
    validation_split=0.2
    )

test_image_genetator = ImageDataGenerator(rescale=1./255)

In [None]:
from random import shuffle


train_images = train_image_genetator.flow(
        train_X, train_y_ohe,
        batch_size=64,
        shuffle=True,
        seed=42,
        subset='training',
)
val_images = val_image_genetator.flow(
        train_X, train_y_ohe,
        batch_size=64,
        seed=42,
        subset='validation',
)
if is_kaggle:
    test_images = test_image_genetator.flow(
        test_X,
        batch_size=64,
        seed=42,
        shuffle=False,
    )
else:
    test_images = test_image_genetator.flow(
        test_X, test_y_ohe,
        batch_size=64,
        seed=42,
        shuffle=False,
    )

### Displaying the first image in the training set

In [None]:
# display images in a grid function
def display_image_grid(images, labels, pred_val=None,shape=(5, 5), figsize=(10, 10)):
    plt.figure(figsize=figsize)
    m = shape[0] * shape[1]
    
    if images.shape[0] < m:
        raise ValueError("images.shape[0] must equal shape[0] * shape[1]")
    
    for i in range(m):
        plt.subplot(shape[0], shape[1], i+1)
        plt.imshow(images[i], cmap="gray", interpolation="none")
        title = labels[i]
        if pred_val is not None:
            title = f"{pred_val[i]}"
            if pred_val[i] != labels[i]:
                title += f"\n*({labels[i]})*"
        plt.title(title)
        plt.axis("off")
        # increase vertical space between subplots
        plt.subplots_adjust(wspace=0.1, hspace=0.4)
    plt.show()


#### Unaugmented images

In [None]:
rnd_pts = np.random.randint(0, train_X.shape[0], 64)
images = train_X[rnd_pts, :, :]
labels = train_y[rnd_pts]

display_image_grid(images, labels, shape=(8, 8))

#### Augmented images

In [None]:
images, labels = train_images.next()
display_image_grid(images, labels.argmax(axis=1), shape=(8, 8))

## MODELS

In [None]:
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, Dropout, Normalization

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, log_loss

# ploting the model training history

def plot_model_performance(history, figsize=(10, 10)):
    preformance = {key: val for key, val in history.history.items() if "loss" not in key}
    losses = {key: val for key, val in history.history.items() if "loss" in key}
    
    plt.figure(figsize=figsize)
    plt.title('Model Performance')
    for key, val in preformance.items():
        plt.plot(val, label=key)
    plt.legend(preformance.keys())
    plt.xlabel('Epoch')
    
    plt.figure(figsize=figsize)
    plt.title('Model Losses')
    for key, val in losses.items():
        plt.plot(val, label=key)
    plt.legend(losses.keys())
    plt.xlabel('Epoch')
    
    plt.show()

def compute_performance_metrics(y, y_pred, verbose=1):
    # labels = test_images_.y.argmax(axis=1)
    labels = y.argmax(axis=1)
    labels_cat = y
    # pred_cat = model.predict(test_images_)
    pred_cat = y_pred
    pred = pred_cat.argmax(axis=1)

    
    performance_metrics = {}
    performance_metrics["accuracy"] = round(accuracy_score(labels, pred), 4)
    performance_metrics["f1_score"] = round(f1_score(labels, pred, average="macro"), 4)
    performance_metrics["precision"] = round(precision_score(labels, pred, average="macro"), 4)
    performance_metrics["recall"] = round(recall_score(labels, pred, average="macro"), 4)
    performance_metrics["loss"] = round(log_loss(labels_cat, pred_cat), 4)
    
    performance_df.loc[model.name] = performance_metrics
    if verbose:
        return performance_df.loc[model.name]

performance_df = pd.DataFrame(columns=["accuracy", "precision", "recall", "f1_score", "loss"])


In [None]:
# callbacks
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler, ModelCheckpoint

monitor_metric = 'val_loss'
learning_rate_decay_rate = 0.1

def get_callbacks():
    callbacks = {}
    
    callbacks["EarlyStopping"] = EarlyStopping(
            monitor=monitor_metric,
            patience=5,
            mode = "auto",
            verbose=1,
        )
    
    callbacks["LearningRateScheduler"] = LearningRateScheduler(step_decay)

    callbacks["ModelCheckpoint"] = ModelCheckpoint(
            MODELS_DIR / f"{model.name}.h5",
            monitor=monitor_metric,
            save_best_only=True,
            mode='auto',
            verbose=1,
    )
    
    return callbacks

def step_decay(epoch):
    initial_lr = LEARNING_RATE
    k = learning_rate_decay_rate
    lr = initial_lr * np.exp(-k*epoch)
    return lr


#### Simple Sequential Model

In [None]:
from tensorflow.keras.models import Sequential

def simple_sequential_model(input_shape, name="SimpleSequentialModel"):
    model = Sequential(name=name)
    
    model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape, padding="same"))
    model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape, padding="same"))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(64, kernel_size=(3, 3), activation='relu', padding="same"))
    model.add(Conv2D(64, kernel_size=(3, 3), activation='relu', padding="same"))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.25))
    model.add(Dense(10, activation='softmax'))
    
    model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
    
    return model


#### Simple Sequential Model

In [None]:
model = simple_sequential_model(input_shape=(28, 28, 1), name="mnist-digits-SimpleSequentialModel")
model.summary()

In [None]:
from tensorflow.keras.models import load_model

model_path = MODELS_DIR / f"{model.name}.h5"

train_model = not (os.path.exists(model_path))
train_model = True

LEARNING_RATE = 5e-4

callbacks = [callback for callback in get_callbacks().values()]

if train_model:
    history = model.fit(train_images, 
                        validation_data=val_images,
                        epochs=5, 
                        callbacks=callbacks,                        
    )
else:
    model = load_model(model_path)
    print(f"{model.name} model loaded from {model_path}")
    

In [None]:
plot_model_performance(history)

In [None]:
if is_kaggle:
    pass

else:
    model.evaluate(test_images)
    test_labels = test_images.y
    test_labels_pred_ohe = model.predict(test_images)
    test_labels_pred = test_labels_pred_ohe.argmax(axis=1)
    compute_performance_metrics(test_labels, test_labels_pred_ohe, True)


performance_df

In [None]:
test_labels.argmax(axis=1)[rnd_pts]

In [None]:
test_images

In [None]:
# Plotting only the incorrect images
# Plotting only the incorrect images
if is_kaggle:
    labels_pred = model.predict(test_images)
    test_labels_pred = test_labels_pred_ohe.argmax(axis=1)
    
    submission_df = pd.DataFrame(columns=["ImageId", "Label"])
    submission_df["ImageId"] = range(1, len(test_labels_pred) + 1)
    submission_df["Label"] = test_labels_pred
    submission_df.to_csv(SUBMISSION_PATH, index=False)
    
    
labels_pred = test_labels_pred
incorrect_pred = np.not_equal(test_labels.argmax(axis=1), test_labels_pred)
incorrect_pred_index = np.where(incorrect_pred)[0]

rnd_pts = np.random.choice(incorrect_pred_index, 25)

images = test_X[rnd_pts, :, :]
labels = test_labels.argmax(axis=1)[rnd_pts]
labels_pred = labels_pred[rnd_pts]

display_image_grid(test_X, labels, labels_pred, shape=(5, 5), figsize=(12, 12))

In [None]:
# ImageId
# Label