In [1]:
# import libraries
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader,TensorDataset
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt

# A function that returns a dataset with a specified size

In [None]:
data_full = np.loadtxt(open('sample_data/mnist_train_small.csv','rb'),delimiter=',')

# Now for the function
def make_the_dataset(N, double_the_data=False):

    # Extract labels (number IDs) and remove from data
    labels = data_full[:N, 0]
    data   = data_full[:N, 1:]

    # Normalize the data to a range of [0 1]
    data_norm = data / np.max(data)

    # Make an exact copy of ALL the data
    if double_the_data:
        data_norm = np.concatenate((data_norm, data_norm), axis=0)
        labels    = np.concatenate((labels, labels), axis=0)

    # Convert to tensor
    data_tensor   = torch.tensor(data_norm).float()
    labels_tensor = torch.tensor(labels).long()

    # Use scikitlearn to split the data
    train_data, test_data, train_labels, test_labels = train_test_split(data_tensor, labels_tensor, train_size=0.9)

    # # Make an exact copy of the TRAIN data
    # if doubleTheData:
    #   train_data   = torch.cat((train_data,train_data),axis=0)
    #   train_labels = torch.cat((train_labels,train_labels),axis=0)
    
    # Convert into PyTorch Datasets
    train_data_set = TensorDataset(train_data, train_labels)
    test_data_set  = TensorDataset(test_data, test_labels)

    # Translate into Dataloader objects
    batch_size   = 20
    train_loader = DataLoader(dataset=train_data_set, batch_size=batch_size,
                            shuffle=True, drop_last=True)
    test_loader  = DataLoader(dataset=test_data_set, 
                            batch_size=test_data_set.tensors[0].shape[0])

    return train_loader, test_loader

In [None]:
# Check the sizes
r, t = make_the_dataset(N=200, double_the_data=False)
print(r.dataset.tensors[0].shape)
print(t.dataset.tensors[0].shape)

r, t = make_the_dataset(N=200, double_the_data=True)
print(r.dataset.tensors[0].shape)
print(t.dataset.tensors[0].shape)

# Create the DL model

In [None]:
def create_the_MNIST_net():
    """"
    FFN_SCRAMBLEDMNIST | FFN_SHIFTEDMNIST | DATA_ DATA_OVERSAMPLING
    """
    class mnist_net(nn.Module):
        def __init__(self):
            super().__init__()

            # Input layer
            self.input = nn.Linear(784, 64)

            # Hidden layer
            self.fc1 = nn.Linear(64, 32)
            self.fc2 = nn.Linear(32, 32)

            # Output layer
            self.output = nn.Linear(32, 10)

        # Forward pass
        def forward(self, x):
            x = F.relu(self.input(x))
            x = F.relu(self.fc1(x))
            x = F.relu(self.fc2(x))

            return self.output(x)
        
    # Create the model instance
    net = mnist_net()

    # Loss Function
    loss_func = nn.CrossEntropyLoss()

    # Optimizer
    optimizer = torch.optim.SGD(net.parameters(), lr=0.01)

    return net, loss_func, optimizer


# Create a function that trains the model

In [None]:
def train_the_model():
    """
    FFN_SCRAMBLEDMNIST | DATA_ DATA_OVERSAMPLING
    """
    
    num_epochs = 50

    # Create a new model
    net, loss_func, optimizer = create_the_MNIST_net()

    # Initialize
    losses    = torch.zeros(num_epochs)
    train_acc = []
    test_acc  = []

    # Loop over epochs
    for epoch_i in range(num_epochs):

        # Loop over training data batches
        batch_acc  = []
        batch_loss = [] 

        for X, y in train_loader:
            # Forward pass and loss
            y_hat = net(X)
            loss  = loss_func(y_hat,y)

            # Backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Loss from this batch
            batch_loss.append(loss.item())

            # Compute accuracy
            matches = torch.argmax(y_hat, axis=1) == y       # Booleans (True/False)
            matches_numeric = matches.float()                # Convert to numbers (1/0)
            accuracy_pct = 100 * torch.mean(matches_numeric) # Average and *100
            batch_acc.append(accuracy_pct)                   # Add to list of accuracies
        # End of batch loop.

        # Get the average training accuracy of the batches
        train_acc.append(np.mean(batch_acc))

        # The average losses accross the batches
        losses[epoch_i] = np.mean(batch_loss)

        # Test accuracy
        X, y = next(iter(test_loader)) # Extract X, y from dataloader
        with torch.no_grad():
          y_hat = net(X)
        test_acc.append(100 * torch.mean((torch.argmax(y_hat, axis=1) == y).float()))

    # End epochs

    return train_acc, test_acc, losses, net

# Run the model once to confirm that it works

In [None]:
# Generate a dataset
train_loader, test_loader = make_the_dataset(N=5000)

# Test it
train_acc, test_acc, losses, net = train_the_model()


# Plot the results
fig, ax = plt.subplots(1, 2, figsize=(16, 5))

ax[0].plot(losses)
ax[0].set_xlabel('Epochs')
ax[0].set_ylabel('Loss')
ax[0].set_ylim([0, 3])
ax[0].set_title('Model Loss')

ax[1].plot(train_acc, label='Train')
ax[1].plot(test_acc,  label='Test')
ax[1].set_xlabel('Epochs')
ax[1].set_ylabel('Accuracy (%)')
ax[1].set_ylim([10, 100])
ax[1].set_title(f'Final model test accuracy: {test_acc[-1]:.2f}')
ax[1].legend()

plt.show()

# Run an experiment showing better performance with increased N



In [None]:
# List of data sample sizes
sample_sizes = np.arange(start=500, stop=4001, step=500)

# Initialize results matrix
results_single = np.zeros(shape=(len(sample_sizes), 3))
results_double = np.zeros(shape=(len(sample_sizes), 3))

for sample_size_idx, sample_size_i in enumerate(sample_sizes):

    # Without doubling the data!
    # Generate a dataset and train the model
    train_loader, test_loader        = make_the_dataset(N=sample_size_i, double_the_data=False)
    train_acc, test_acc, losses, net = train_the_model()

    # Grab the results
    results_single[sample_size_idx, 0] = np.mean(train_acc[-5:])
    results_single[sample_size_idx, 1] = np.mean(test_acc[-5:])
    results_single[sample_size_idx, 2] = torch.mean(loss[-5:]).item()

    # With doubling the data!
    # Generate a dataset and train the model
    train_loader, test_loader        = make_the_dataset(N=sample_size_i, double_the_data=True)
    train_acc, test_acc, losses, net = train_the_model()

    # Grab the results
    results_double[sample_size_idx, 0] = np.mean(train_acc[-5:])
    results_double[sample_size_idx, 1] = np.mean(test_acc[-5:])
    results_double[sample_size_idx, 2] = torch.mean(loss[-5:]).item()


In [None]:
fig, ax = plt.subplot(1, 3, figsize=(15, 5))

# Axis and title labels
titles      = ['Train', 'Devset', 'Losses']
y_ax_labels = ['Accuracy', 'Accuracy', 'Losses']

# Common features
for i in range(3):

    # Plot the lines
    ax[i].plot(sample_sizes, results_single[:, i], 's-', label='Original')
    ax[i].plot(sample_sizes, results_double[:, i], 's-', label='Doubled')

    # Make it look nicer
    ax[i].set_ylabel(y_ax_labels[i])
    ax[i].set_title(titles[i])
    ax[i].legend()
    ax[i].set_xlabel('Unique sample size')
    ax[i].grid('on')

    if (i < 2):
        ax[i].set_ylim([20, 102])

plt.tight_layout()
plt.show()

# Additional explorations

In [None]:
# 1) Notice that we're using the "test_dataset" multiple times, which really means that it's the devset,
#    aka hold-out set, and not a true TEST set. A real test set gets evaluated only once. Modify the code
#    to create a test set, using images in dataFull that are not in dataNorm. Note that you don't need 
#    to re-run the entire experiment; you only need to train two models (and save their 'net' outputs), so that
#    you can run the test data through (make sure to normalize the test data!). Then you can evaluate the test
#    performance relative to train and devset from those two models.
# 
# 2) We've previously discovered that Adam can outperform SGD on the MNIST dataset. I used SGD here on purpose --
#    to make performance worse (!) so we could test for effects of oversampling. Re-run the experiment using
#    Adam to see whether you still get the same effects. 
# 