# Chapter 3: Classification
## Ex. 1 & 2: MNIST Classification

In [None]:
import tarfile
import urllib.request
from pathlib import Path

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.calibration import cross_val_predict
from sklearn.datasets import fetch_openml
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

plt.style.use("dark_background")

## 1. Building a classifier for the MNIST dataset that achieves over 97% accuracy

In [None]:
mnist = fetch_openml("mnist_784", as_frame=False, parser="auto")
X, y = mnist.data, mnist.target
X_train, X_test, y_train, y_test = X[:60_000], X[60_000:], y[:60_000], y[60_000:]

In [None]:
def plot_digit(image_data):
    plt.imshow(image_data.reshape(28, 28), cmap="binary_r")
    plt.axis("off")


plt.figure()
for idx, image_data in enumerate(X[:100]):
    plt.subplot(10, 10, idx + 1)
    plot_digit(image_data)

plt.show()

In [None]:
model = make_pipeline(StandardScaler(), KNeighborsClassifier())
model.get_params()

In [None]:
param_grid = {
    "standardscaler__with_mean": [False],  # maintains sparsity
    "standardscaler__with_std": [True, False],
    "kneighborsclassifier__n_neighbors": [3, 4, 5, 6],
    "kneighborsclassifier__p": [1, 2, 3],
    "kneighborsclassifier__weights": ["uniform", "distance"],
}

grid_search = GridSearchCV(model, param_grid, scoring="accuracy", cv=3, verbose=3)

In [None]:
model_name = "03_mnist_grid_search"
model_path = Path(f"models/{model_name}.pkl")

if model_path.is_file():
    grid_search = joblib.load(model_path)
else:
    grid_search.fit(X_train[:10_000], y_train[:10_000], n_jobs=-1)
    joblib.dump(grid_search, model_path)

grid_search.best_score_

In [None]:
grid_search.best_params_

In [None]:
grid_search_cv_results = pd.DataFrame(grid_search.cv_results_)
grid_search_cv_results.columns = grid_search_cv_results.columns.str.split("__").str[-1]

grid_search_cv_results = grid_search_cv_results[
    (grid_search_cv_results["weights"] == "distance")
    & (grid_search_cv_results["with_mean"] == False)
    & (grid_search_cv_results["with_std"] == False)
].drop(
    columns=[
        "mean_score_time",
        "std_score_time",
        "params",
        "split0_test_score",
        "split1_test_score",
        "split2_test_score",
        "weights",
        "with_mean",
        "with_std",
    ]
)

grid_search_cv_results.head()

In [None]:
fig, ax = plt.subplots()

for p_value in reversed(grid_search_cv_results["p"].unique()):
    grid_search_cv_results[grid_search_cv_results["p"] == p_value].plot(
        x="n_neighbors", y="mean_test_score", ax=ax, label=f"p = {p_value}"
    )

plt.xlabel("n_neighbors")
plt.ylabel("mean_test_score")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
grid_search.best_params_

In [None]:
param_grid2 = {
    "standardscaler__with_mean": [False],  # maintains sparsity
    "standardscaler__with_std": [False],
    "kneighborsclassifier__n_neighbors": [4],
    "kneighborsclassifier__p": [10, 11, 12, 13, 14],
    "kneighborsclassifier__weights": ["distance"],
}

grid_search = GridSearchCV(model, param_grid2, scoring="accuracy", cv=3, verbose=3)

model_name = "03_mnist_grid_search2"
model_path = Path(f"models/{model_name}.pkl")

if model_path.is_file():
    grid_search = joblib.load(model_path)
else:
    grid_search.fit(X_train[:10_000], y_train[:10_000])
    joblib.dump(grid_search, model_path)

grid_search.best_score_

In [None]:
grid_search.best_params_

In [None]:
grid_search_cv_results = pd.DataFrame(grid_search.cv_results_)
grid_search_cv_results.plot(x="param_kneighborsclassifier__p", y="mean_test_score")
grid_search_cv_results.plot(x="param_kneighborsclassifier__p", y="std_test_score")
grid_search_cv_results.plot(x="param_kneighborsclassifier__p", y="mean_fit_time")

print(grid_search_cv_results.columns)

In [None]:
model_name = "03_y_test_predicted"
model_path = Path(f"models/{model_name}.pkl")

if model_path.is_file():
    y_test_predicted = joblib.load(model_path)
else:
    grid_search.best_estimator_.fit(X_train, y_train)
    y_test_predicted = grid_search.predict(X_test)
    joblib.dump(y_test_predicted, model_path)

In [None]:
print(
    f"accuracy_score: {accuracy_score(y_test, y_test_predicted)}",
    f"precision_score: {precision_score(y_test, y_test_predicted, average='macro')}",
    f"recall_score: {recall_score(y_test, y_test_predicted, average='macro')}",
    f"f1_score: {f1_score(y_test, y_test_predicted, average='macro')}",
    sep="\n",
)

## 2. Data Augmentation

In [None]:
def shift(image_data, direction=(0, 0)):
    image = image_data.reshape(28, 28)

    if direction[0] > 0:
        image = np.hstack(
            (
                np.zeros((28, abs(direction[0]))),
                image[:, : -direction[0]],
            )
        )

    elif direction[0] < 0:
        image = np.hstack(
            (
                image[:, -direction[0] :],
                np.zeros((28, abs(direction[0]))),
            )
        )

    if direction[1] > 0:
        image = np.vstack(
            (
                image[direction[1] :, :],
                np.zeros((abs(direction[1]), 28)),
            )
        )

    elif direction[1] < 0:
        image = np.vstack(
            (
                np.zeros((abs(direction[1]), 28)),
                image[: direction[1], :],
            )
        )

    return image.reshape(784)


plt.figure()

plt.subplot(2, 2, 1)
plot_digit(X_test[0])

plt.subplot(2, 2, 2)
plot_digit(shift(X_test[0], direction=(0, -3)))

plt.subplot(2, 2, 3)
plot_digit(shift(X_test[0], direction=(3, 0)))

In [None]:
X_train_augmented = [image for image in X_train]
y_train_augmented = [label for label in y_train]

for direction in [(-1, 0), (1, 0), (0, 1), (0, -1)]:
    for image, label in zip(X_train, y_train):
        X_train_augmented.append(shift(image, direction))
        y_train_augmented.append(label)

X_train_augmented = np.array(X_train_augmented)
y_train_augmented = np.array(y_train_augmented)

shuffle_idx = np.random.permutation(len(X_train_augmented))
X_train_augmented = X_train_augmented[shuffle_idx]
y_train_augmented = y_train_augmented[shuffle_idx]

In [None]:
model_name = "03_y_test_predicted_augmented"
model_path = Path(f"models/{model_name}.pkl")

if model_path.is_file():
    y_test_predicted_aug = joblib.load(model_path)
else:
    grid_search.best_estimator_.fit(X_train_augmented, y_train_augmented)
    y_test_predicted_aug = grid_search.predict(X_test)
    joblib.dump(y_test_predicted, model_path)

In [None]:
print(
    f"accuracy_score: {accuracy_score(y_test, y_test_predicted_aug)}",
    f"precision_score: {precision_score(y_test, y_test_predicted_aug, average='macro')}",
    f"recall_score: {recall_score(y_test, y_test_predicted_aug, average='macro')}",
    f"f1_score: {f1_score(y_test, y_test_predicted_aug, average='macro')}",
    sep="\n",
)

In [None]:
print(
    f"Accuracy improvement: {(accuracy_score(y_test, y_test_predicted_aug) - accuracy_score(y_test, y_test_predicted)):.2%}"
)

In [None]:
error_rate_change = (1 - accuracy_score(y_test, y_test_predicted_aug)) / (
    1 - accuracy_score(y_test, y_test_predicted)
) - 1
print(f"error rate change: {error_rate_change:.0%}")