# Machine Learning with PyTorch and Scikit-Learn  

#### CHANGES ####
Moved imports to a dedicated cell.
Moved magic nubmers to a dedicated constants cell

Imports

In [None]:
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_openml
import numpy as np
import numpy as np


Constants

In [None]:
RANDOM_SEED: int = 123
NUM_EPOCHS: int = 50
LEARNING_RATE: float = 0.1

## Obtaining and preparing the MNIST dataset

The MNIST dataset is publicly available at http://yann.lecun.com/exdb/mnist/ and consists of the following four parts:

- Training set images: train-images-idx3-ubyte.gz (9.9 MB, 47 MB unzipped, 60,000 examples)
- Training set labels: train-labels-idx1-ubyte.gz (29 KB, 60 KB unzipped, 60,000 labels)
- Test set images: t10k-images-idx3-ubyte.gz (1.6 MB, 7.8 MB, 10,000 examples)
- Test set labels: t10k-labels-idx1-ubyte.gz (5 KB, 10 KB unzipped, 10,000 labels)



In [None]:
X, y = fetch_openml('mnist_784', version=1, return_X_y=True)
X = X.values
y = y.astype(int).values

print(X.shape)
print(y.shape)

Normalize to [-1, 1] range:

In [None]:
X = ((X / 255.) - .5) * 2

In [None]:
X_temp, X_test, y_temp, y_test = train_test_split(
    X, y, test_size=10000, random_state=123, stratify=y)

X_train, X_valid, y_train, y_valid = train_test_split(
    X_temp, y_temp, test_size=5000, random_state=123, stratify=y_temp)


# optional to free up some memory by deleting non-used arrays:
del X_temp, y_temp, X, y

## Implementing a multi-layer perceptron

The model was changed to be modular to support an arbitrary number of layers.

In [None]:
##########################
### MODEL
##########################

def sigmoid(z):                                        
    return 1. / (1. + np.exp(-z))

def relu(z):
    return np.maximum(0, z)

def relu_derivative(z):
    return (z > 0).astype(float)

def int_to_onehot(y, num_labels):

    ary = np.zeros((y.shape[0], num_labels))
    for index, val in enumerate(y):
        ary[index, val] = 1

    return ary

class NeuralNetMLP:
    def __init__(self, 
                 num_features: int, 
                 hidden_layers_sizes: list[int],
                 num_classes: int,
                 random_seed=RANDOM_SEED
                 ):
        
        
        if len(hidden_layers_sizes) < 1:
            raise ValueError("hidden_layers_sizes must contain at least one layer.")
        rng = np.random.RandomState(random_seed)
        self.num_classes = num_classes
        self.layer_weights = [rng.normal(loc=0.0, scale=0.1, size=(hidden_layers_sizes[0], num_features))]
        self.biases = list()
        # hidden
        
        
        for index in range(1, len(hidden_layers_sizes)):
            self.layer_weights.append(
                rng.normal(
                    loc=0.0, 
                    scale=0.1, 
                    size=(hidden_layers_sizes[index], hidden_layers_sizes[index-1])
                )
            )
        for index in range(len(hidden_layers_sizes)):
            self.biases.append(np.zeros(hidden_layers_sizes[index]))

        # output
        self.layer_weights.append(rng.normal(loc=0.0, scale=0.1, size=(num_classes, hidden_layers_sizes[-1])))
        self.biases.append(np.zeros(num_classes))
        
    def forward(self, input: np.ndarray):
        layer_activations = []
        layer_pre_activations = []  # store z for ReLU derivative
        current_input = input

        num_of_layers = len(self.layer_weights)
        for layer_index in range(num_of_layers):
            z = np.dot(current_input, self.layer_weights[layer_index].T) + self.biases[layer_index]
            layer_pre_activations.append(z)

            # ReLU for hidden layers, sigmoid only at output
            if num_of_layers == 2: # Added a special case for single layer for reproducibility
                a = sigmoid(z)
            else:
                if layer_index < num_of_layers - 1:
                    a = relu(z)
                else:
                    a = sigmoid(z)

            layer_activations.append(a)
            current_input = a

        return layer_activations, layer_pre_activations, layer_activations[-1]

    def backward(self, input, layer_activations, layer_pre_activations, network_output, targets):
        N = input.shape[0]
        targets_onehot = int_to_onehot(targets, self.num_classes)

        num_layers = len(self.layer_weights)
        loss_weights_list = [None] * num_layers
        loss_biases_list  = [None] * num_layers

        # output delta (sigmoid + MSE)
        a_out = network_output
        d_loss__d_a = 2.0 * (a_out - targets_onehot) / N
        z_out = layer_pre_activations[-1]
        delta = d_loss__d_a * (sigmoid(z_out) * (1.0 - sigmoid(z_out)))  # == a_out*(1-a_out)
        use_sigmoid_hidden = (num_layers == 2)  # special case for single layer
    # backprop
        for layer_idx in range(num_layers - 1, -1, -1):
            a_prev = input if layer_idx == 0 else layer_activations[layer_idx - 1]

            loss_weights_list[layer_idx] = np.dot(delta.T, a_prev)
            loss_biases_list[layer_idx]  = np.sum(delta, axis=0)

            if layer_idx > 0:
                curr_W = self.layer_weights[layer_idx]      # (out, in)
                d_loss__d_a_prev = np.dot(delta, curr_W)    # (N, in)

                z_prev = layer_pre_activations[layer_idx - 1]

                if use_sigmoid_hidden:
                    # hidden activation is sigmoid -> derivative uses a_prev*(1-a_prev)
                    a_prev_act = layer_activations[layer_idx - 1]
                    delta = d_loss__d_a_prev * (a_prev_act * (1.0 - a_prev_act))
                else:
                    # hidden activation is ReLU
                    delta = d_loss__d_a_prev * relu_derivative(z_prev)

        return loss_weights_list, loss_biases_list



In [None]:
multi_layer_model = NeuralNetMLP(num_features=28*28,
                     hidden_layers_sizes=[50, 50],
                     num_classes=10)
single_layer_model = NeuralNetMLP(num_features=28*28,
                     hidden_layers_sizes=[50],
                     num_classes=10)

## Coding the neural network training loop

Defining data loaders:

In [None]:
num_epochs = 50
minibatch_size = 100


def minibatch_generator(X, y, minibatch_size):
    indices = np.arange(X.shape[0])
    np.random.shuffle(indices)

    for start_idx in range(0, indices.shape[0] - minibatch_size 
                           + 1, minibatch_size):
        batch_idx = indices[start_idx:start_idx + minibatch_size]
        
        yield X[batch_idx], y[batch_idx]

        
# iterate over training epochs
for i in range(num_epochs):

    # iterate over minibatches
    minibatch_gen = minibatch_generator(
        X_train, y_train, minibatch_size)
    
    for X_train_mini, y_train_mini in minibatch_gen:

        break
        
    break
print(X_train_mini.shape)
print(y_train_mini.shape)

Defining a function to compute the loss and accuracy

In [None]:
def mse_loss(targets, probas, num_labels=10):
    onehot_targets = int_to_onehot(targets, num_labels=num_labels)
    return np.mean((onehot_targets - probas)**2)


def accuracy(targets, predicted_labels):
    return np.mean(predicted_labels == targets) 

def evaluate_model(model, features, true_labels):
    _, _, probas = model.forward(features)
    mse = mse_loss(true_labels, probas)
    predicted_labels = np.argmax(probas, axis=1)
    acc = accuracy(true_labels, predicted_labels)
    return acc, mse

In [None]:
acc, mse = evaluate_model(single_layer_model, X_valid, y_valid)
print('Evaluating single layer model...')
print(f'Initial validation MSE: {mse:.1f}')
print(f'Initial validation accuracy: {acc*100:.1f}%')
acc, mse = evaluate_model(multi_layer_model, X_valid, y_valid)
print('Evaluating double layer model...')
print(f'Initial validation MSE: {mse:.1f}')
print(f'Initial validation accuracy: {acc*100:.1f}%')

In [None]:
def compute_mse_and_acc(nnet, X, y, num_labels=10, minibatch_size=100):
    mse, correct_pred, num_examples = 0., 0, 0
    minibatch_gen = minibatch_generator(X, y, minibatch_size)
        
    for i, (features, targets) in enumerate(minibatch_gen):

        _, _, probas = nnet.forward(features)
        predicted_labels = np.argmax(probas, axis=1)
        
        onehot_targets = int_to_onehot(targets, num_labels=num_labels)
        loss = np.mean((onehot_targets - probas)**2)
        correct_pred += (predicted_labels == targets).sum()
        
        num_examples += targets.shape[0]
        mse += loss

    mse = mse/(i+1)
    acc = correct_pred/num_examples
    return mse, acc

In [None]:
mse, acc = compute_mse_and_acc(single_layer_model, X_valid, y_valid)
print('Evaluating single layer model...')
print(f'Initial valid MSE: {mse:.1f}')
print(f'Initial valid accuracy: {acc*100:.1f}%')
mse, acc = compute_mse_and_acc(multi_layer_model, X_valid, y_valid)
print('Evaluating double layer model...')
print(f'Initial valid MSE: {mse:.1f}')
print(f'Initial valid accuracy: {acc*100:.1f}%')

In [None]:
from sklearn.metrics import roc_auc_score

In [None]:
def compute_auc(model, X, y):
    _, _, probas = model.forward(X)
    y_onehot = int_to_onehot(y, num_labels=model.num_classes)
    macro_auc = roc_auc_score(y_onehot, probas, multi_class="ovr", average="macro")
    return macro_auc

In [None]:
auc1 = compute_auc(single_layer_model, X_valid, y_valid)
auc2 = compute_auc(multi_layer_model, X_valid, y_valid)
print('Evaluating single layer model...')
print(f'Initial valid AUC1: {auc1:.1f}')
print('Evaluating double layer model...')
print(f'Initial valid AUC2: {auc2:.1f}')

In [None]:
def train(model, X_train, y_train, X_valid, y_valid, num_epochs,
          learning_rate=0.1):

    epoch_loss = []
    epoch_train_acc = []
    epoch_valid_acc = []

    for e in range(num_epochs):

        minibatch_gen = minibatch_generator(X_train, y_train, minibatch_size)

        for X_train_mini, y_train_mini in minibatch_gen:

            #### Forward ####
            activations_list, pre_activations, a_out = model.forward(X_train_mini)
            #### Backward ####
            weight_loss_list, bias_loss_list = model.backward(
                X_train_mini,
                activations_list,
                pre_activations,
                a_out,
                y_train_mini
            )

            #### Update ####
            for layer_idx in range(len(model.layer_weights)):
                model.layer_weights[layer_idx] -= learning_rate * weight_loss_list[layer_idx]
                model.biases[layer_idx]        -= learning_rate * bias_loss_list[layer_idx]


        #### Epoch Logging ####
        train_mse, train_acc = compute_mse_and_acc(model, X_train, y_train)
        valid_mse, valid_acc = compute_mse_and_acc(model, X_valid, y_valid)

        train_acc *= 100.0
        valid_acc *= 100.0

        epoch_train_acc.append(train_acc)
        epoch_valid_acc.append(valid_acc)
        epoch_loss.append(train_mse)

        print(f'Epoch: {e+1:03d}/{num_epochs:03d} '
              f'| Train MSE: {train_mse:.2f} '
              f'| Train Acc: {train_acc:.2f}% '
              f'| Valid Acc: {valid_acc:.2f}%')
    return epoch_loss, epoch_train_acc, epoch_valid_acc


In [None]:
np.random.seed(RANDOM_SEED) # for the training set shuffling

In [None]:
print('Training single layer model...')
single_layer_epoch_loss, single_layer_epoch_train_acc, single_layer_epoch_valid_acc = train(
    single_layer_model, X_train, y_train, X_valid, y_valid,
    num_epochs=NUM_EPOCHS, learning_rate=LEARNING_RATE)

In [None]:
print('Training double layer model...')
double_layer_epoch_loss, double_layer_epoch_train_acc, double_layer_epoch_valid_acc = train(
    multi_layer_model, X_train, y_train, X_valid, y_valid,
    num_epochs=NUM_EPOCHS, learning_rate=LEARNING_RATE)

## Evaluating the neural network performance

In [None]:
plt.plot(range(len(epoch_loss)), epoch_loss)
plt.ylabel('Mean squared error')
plt.xlabel('Epoch')
plt.show()

In [None]:
plt.plot(range(len(epoch_train_acc)), epoch_train_acc,
         label='Training')
plt.plot(range(len(epoch_valid_acc)), epoch_valid_acc,
         label='Validation')
plt.ylabel('Accuracy')
plt.xlabel('Epochs')
plt.legend(loc='lower right')
plt.show()

In [None]:
single_layer_test_mse, single_layer_test_acc = compute_mse_and_acc(single_layer_model, X_test, y_test)
double_layer_test_mse, double_layer_test_acc = compute_mse_and_acc(multi_layer_model, X_test, y_test)
print(f'Test accuracy, single layer: {single_layer_test_acc*100:.2f}%')
print(f'Test accuracy, double layer: {double_layer_test_acc*100:.2f}%')

Plot failure cases:

In [None]:
X_test_subset = X_test[:1000, :]
y_test_subset = y_test[:1000]

_, _, probas = model.forward(X_test_subset)
test_pred = np.argmax(probas, axis=1)

misclassified_images = X_test_subset[y_test_subset != test_pred][:25]
misclassified_labels = test_pred[y_test_subset != test_pred][:25]
correct_labels = y_test_subset[y_test_subset != test_pred][:25]

In [None]:
fig, ax = plt.subplots(nrows=5, ncols=5, 
                       sharex=True, sharey=True, figsize=(8, 8))
ax = ax.flatten()
for i in range(25):
    img = misclassified_images[i].reshape(28, 28)
    ax[i].imshow(img, cmap='Greys', interpolation='nearest')
    ax[i].set_title(f'{i+1}) '
                    f'True: {correct_labels[i]}\n'
                    f' Predicted: {misclassified_labels[i]}')

ax[0].set_xticks([])
ax[0].set_yticks([])
plt.tight_layout()
#plt.savefig('figures/11_09.png', dpi=300)
plt.show()