In [None]:
import numpy as np
from urllib import request
import gzip
import pickle
import os
import pandas as pd
import random
import math
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
def load_synth(num_train=60_000, num_val=10_000):
    """
    Load some very basic synthetic data that should be easy to classify. Two features, so that we can plot the
    decision boundary (which is an ellipse in the feature space).
    :param num_train: Number of training instances
    :param num_val: Number of test/validation instances
    :param num_features: Number of features per instance
    :return: Two tuples (xtrain, ytrain), (xval, yval) the training data is a floating point numpy array:
    """

    THRESHOLD = 0.6
    quad = np.asarray([[1, 0.5], [1, .2]])

    ntotal = num_train + num_val

    x = np.random.randn(ntotal, 2)

    # compute the quadratic form
    q = np.einsum('bf, fk, bk -> b', x, quad, x)
    y = (q > THRESHOLD).astype(np.int)

    return (x[:num_train, :], y[:num_train]), (x[num_train:, :], y[num_train:]), 2


def load_mnist(final=False, flatten=True):
    """
    Load the MNIST data
    :param final: If true, return the canonical test/train split. If false, split some validation data from the training
       data and keep the test data hidden.
    :param flatten:
    :return:
    """

    if not os.path.isfile('mnist.pkl'):
        init()

    xtrain, ytrain, xtest, ytest = load()
    xtl, xsl = xtrain.shape[0], xtest.shape[0]

    if flatten:
        xtrain = xtrain.reshape(xtl, -1)
        xtest  = xtest.reshape(xsl, -1)

    if not final: # return the flattened images
        return (xtrain[:-5000], ytrain[:-5000]), (xtrain[-5000:], ytrain[-5000:]), 10

    return (xtrain, ytrain), (xtest, ytest), 10

# Numpy-only MNIST loader. Courtesy of Hyeonseok Jung
# https://github.com/hsjeong5/MNIST-for-Numpy

filename = [
["training_images","train-images-idx3-ubyte.gz"],
["test_images","t10k-images-idx3-ubyte.gz"],
["training_labels","train-labels-idx1-ubyte.gz"],
["test_labels","t10k-labels-idx1-ubyte.gz"]
]

def download_mnist():
    base_url = "http://yann.lecun.com/exdb/mnist/"
    for name in filename:
        print("Downloading "+name[1]+"...")
        request.urlretrieve(base_url+name[1], name[1])
    print("Download complete.")

def save_mnist():
    mnist = {}
    for name in filename[:2]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1,28*28)
    for name in filename[-2:]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=8)
    with open("mnist.pkl", 'wb') as f:
        pickle.dump(mnist,f)
    print("Save complete.")

def init():
    download_mnist()
    save_mnist()

def load():
    with open("mnist.pkl",'rb') as f:
        mnist = pickle.load(f)
    return mnist["training_images"], mnist["training_labels"], mnist["test_images"], mnist["test_labels"]

# Part 2

In [None]:
class Neural_net:
    '''
    Neural network with 2 inputs, 
    1 hidden layer with 3 nodes and sigmoid activation, 
    an output layer with 2 nodes and softmax activation.
    '''
    
    def __init__(self):
        self.w1 = [[0, 0, 0],
              [0, 0, 0]]
        self.b1 = [0, 0, 0]
        self.w2 = [[0, 0],
                  [0, 0],
                  [0, 0]]
        self.b2 = [0, 0]
        self.lr = 0.05
        self.epochs = 5
        
        
    def initialize_weights(self):
        for i in range(len(self.w1[0])):  # 3
            for j in range(len(self.w1)):  # 2
                self.w1[j][i] = random.gauss(0, 0.5)
        for i in range(len(self.w2[0])):  # 3
            for j in range(len(self.w2)):  # 2
                self.w2[j][i] = random.gauss(0, 0.5)   
    
    def predict(self, x, training=False):
        z1 = [0, 0, 0]
        for i in range(len(self.w1[0])):  # 3
            for j in range(len(x)):  # 2
                z1[i] += x[j]*self.w1[j][i]
            z1[i] += self.b1[i]

        a1 = [0, 0, 0]
        for i, z in enumerate(z1):  # 3
            a1[i] = (1/(1+math.exp(-z)))

        z2 = [0, 0]
        for i in range(len(self.w2[0])):  # 2
            for j in range(len(a1)):  # 3
                z2[i] += a1[j]*self.w2[j][i]
            z2[i] +=  self.b2[i]

        exp_a2 = [0, 0]
        for i, z in enumerate(z2):  # 2
            exp_a2[i] = math.exp(z)

        a2 = [0, 0]
        sum_ = sum(exp_a2)
        for i, exp_a in enumerate(exp_a2):  # 2
            a2[i] = exp_a/sum_

        if training:
            return a2, a1
        else:
            return a2
    
    def compute_loss(self, a2, y, training=False):
        y_ = [0, 0]
        y_[y] = 1

        loss = 0

        for i in range(len(a2)):  # 2
            if (a2[i] != 0) and (a2[i] != 1):
                loss -= (y_[i]*math.log(a2[i]))# + (1 - y_[i])*math.log(1-a2[i]))

        dz2 = [0, 0]
        for i in range(len(dz2)):  # 2
            dz2[i] = a2[i] - y_[i]

        if training:
            return loss, dz2
        else:
            return loss

    def backward_pass(self, dz2, a1, x):
        da1 = [0, 0, 0]
        dw2 = [[0, 0],
               [0, 0],
               [0, 0]]
        db2 = [0, 0]
        for i, dz in enumerate(dz2):  # 2
            db2[i] = dz
            for j in range(len(a1)): # 3
                dw2[j][i] = dz*a1[j]
                da1[j] += dz*self.w2[j][i]

        dz1 = [0, 0, 0]
        for i in range(len(dz1)):  # 3
            dz1[i] = da1[i]*a1[i]*(1 - a1[i])

        dw1 = [[0, 0, 0],
               [0, 0, 0]]
        db1 = [0, 0, 0]
        for i in range(len(dw1[0])):  # 3
            db1[i] = dz1[i]
            for j in range(len(dw1)):  # 2
                dw1[j][i] = dz1[i]*x[j]

        return dw2, db2, dw1, db1

    def update_weights(self, dw1, db1, dw2, db2):
        for i in range(len(self.w1[0])):
            for j in range(len(self.w1)):
                self.w1[j][i] -= self.lr*dw1[j][i]

        for i in range(len(self.w2[0])):
            for j in range(len(self.w2)):
                self.w2[j][i] -= self.lr*dw2[j][i]

        for i in range(len(self.b1)):
            self.b1[i] -= self.lr*db1[i]

        for i in range(len(self.b2)):
            self.b2[i] -= self.lr*db2[i]
            
    def train(self, X_train, Y_train, X_val = None, Y_val = None, learning_rate = 0.05, epochs = 5):
        self.lr = learning_rate
        self.epochs = epochs
        
        self.initialize_weights()

        loss_hist = {'train': [], 'val': []}
        
        for j in range(self.epochs): #epochs

            indices = np.arange(X_train.shape[0])
            np.random.shuffle(indices)
            X_train, Y_train = X_train[indices], Y_train[indices]  # shuffle training set

            for i, (x, y) in enumerate(zip(X_train[indices], Y_train[indices])): # go through all data
                a2, a1 = self.predict(x, training=True)
                loss, dz2 = self.compute_loss(a2, y, training=True)
                dw2, db2, dw1, db1 = self.backward_pass(dz2, a1, x)
                self.update_weights(dw1, db1, dw2, db2)

            loss_train = 0            
            for i, (x, y) in enumerate(zip(X_train[indices], Y_train[indices])): # go through all data
                a2 = self.predict(x)
                loss = self.compute_loss(a2, y)
                loss_train += (1/X_train.shape[0])*loss
            loss_hist['train'].append(loss_train)
            print(j, 'train:', round(loss_hist['train'][-1], 4))

            if X_val is not None:
                loss_val = 0
                for i, (x, y) in enumerate(zip(X_val, Y_val)):
                    a2 = self.predict(x)
                    loss = self.compute_loss(a2, y)
                    loss_val += (1/X_val.shape[0])*loss
                loss_hist['val'].append(loss_val)
                print('\tval:', round(loss_hist['val'][-1], 4))

        if X_val is not None:
            return loss_hist
        else:
            return loss_hist['train']

## Q2

In [None]:
nn = Neural_net()
nn.w1 = [[1, 1, 1],
     [-1, -1, -1]]
nn.b1 = [0, 0, 0]
nn.w2 = [[1, 1],
     [-1, -1],
     [-1, -1]]
nn.b2 = [0, 0]


x = [1, -1]

a2, a1 = nn.predict(x, training=True)
loss, dz2 = nn.compute_loss(a2, 0, training=True)
dw2, db2, dw1, db1 = nn.backward_pass(dz2, a1, x)


print(loss)
print(dw1, db1)
print(dw2, db2)

## Q3

In [None]:
hists = {'train': [], 'val': []}

for i in range(5):
    (x_train, y_train), (x_val, y_val), input_shape = load_synth()
    
    nn = Neural_net()
    loss_hist = nn.train(x_train, y_train, x_val, y_val, epochs=15, learning_rate=1e-3)
    hists['train'].append(loss_hist['train'])
    hists['val'].append(loss_hist['val'])
    print('###########')

In [None]:
mtr = np.asarray(hists['train']).mean(axis=0)
sdtr = np.asarray(hists['train']).std(axis=0)
mte = np.asarray(hists['val']).mean(axis=0)
sdte = np.asarray(hists['val']).std(axis=0)

t = np.arange(1, mtr.shape[0]+1)

plt.plot(t, mtr, lw=2, label='Train', color='blue')
plt.plot(t, mte, lw=2, label='Val', color='red')
plt.fill_between(t, mtr+sdtr, mtr-sdtr, facecolor='blue', alpha=0.3)
plt.fill_between(t, mte+sdte, mte-sdte, facecolor='red', alpha=0.3)

plt.legend(loc='upper right')
plt.xlabel('Epochs')
plt.ylabel('Cross Entropy Loss')
plt.grid()

plt.savefig('Question 3')
plt.show()

# Part 3

## Q4

In [None]:
from sklearn.metrics import log_loss

class Neural_net:
    '''
    Neural network with 784 inputs, 
    1 hidden layer with 300 nodes and sigmoid activation, 
    an output layer with 10 nodes and softmax activation.
    '''
    
    def __init__(self):
        self.lr = 0.05
        self.epochs = 5
        
    def initialize_weights(self):
        self.w1 = np.random.randn(300, 784)*0.5
        self.b1 = np.zeros((300, 1))
        self.w2 = np.random.randn(10, 300)*0.5
        self.b2 = np.zeros((10, 1))

    def predict(self, x, training=False):
        z1 = self.w1.dot(x) + self.b1
        a1 = 1 / (1 + np.exp(-z1))
        z2 = self.w2.dot(a1) + self.b2
        a2 = np.exp(z2)/np.exp(z2).sum(axis=0)
        
        if training:
            return a2, a1
        else:
            return a2

    def compute_loss(self, a2, y_, training=False):
        if type(y_) == np.uint8:
            m = 1
        else:
            m = y_.shape[0]

        y = np.zeros((m, 10))
        y[np.arange(y_.size), y_] = 1
        y = y.T         
        loss = -(1/m)*np.sum(np.multiply(np.log(a2), y) + np.multiply(np.log(1-a2), (1-y)))
        dz2 = a2 - y 
        
        if training:
            return loss, dz2
        else:
            return loss

    def backward_pass(self, dz2, a1, x):
        m = x.shape[1]
        dw2 = (1/m)*np.dot(dz2, a1.T)
        db2 = (1/m)*np.sum(dz2)
        dz1 = (1/m)*np.multiply((a1 * (1 - a1)), np.dot(self.w2.T, dz2))
        dw1 = (1/m)*np.dot(dz1, x.T)
        db1 = (1/m)*np.sum(dz1)
        return dw2, db2, dw1, db1

    def update_weights(self, dw1, db1, dw2, db2):
        self.w1 -= self.lr*dw1
        self.b1 -= self.lr*db1
        self.w2 -= self.lr*dw2
        self.b2 -= self.lr*db2
        
    def train(self, X_train, Y_train, X_val = None, Y_val = None, learning_rate = 0.05, epochs = 5):
        self.lr = learning_rate
        self.epochs = epochs
        
        self.initialize_weights()

        loss_hist = {'train': [], 'val': []}
        
        for j in range(self.epochs): #epochs

            indices = np.arange(X_train.shape[0])
            np.random.shuffle(indices)
            X_train, Y_train = X_train[indices], Y_train[indices]  # shuffle training set

            for i, (x, y) in enumerate(zip(X_train[indices], Y_train[indices])): # go through all data
                x = x[:, None]
                a2, a1 = self.predict(x, training=True)
                loss, dz2 = self.compute_loss(a2, y, training=True)
                dw2, db2, dw1, db1 = self.backward_pass(dz2, a1, x)
                self.update_weights(dw1, db1, dw2, db2)

            preds = self.predict(X_train.T)
            loss = self.compute_loss(preds, Y_train)
            loss_hist['train'].append(loss)
            print(j, loss)

            if X_val is not None:
                preds = self.predict(X_val.T)
                loss = self.compute_loss(preds, Y_val)
                loss_hist['val'].append(loss)
                print('\t', loss)
                
        if X_val is not None:
            return loss_hist
        else:
            return loss_hist['train']        

In [None]:
def load_mnist_v2():
    (x_train, y_train), (x_val, y_val), input_shape = load_mnist(final=False)

    x_train = x_train / 255
    x_val = x_val / 255
    
    return (x_train, y_train), (x_val, y_val)

In [None]:
def q5_learning_rates(n_its = 3):
    print('LR: ', 0.05)
    hists_05 = {'train': [], 'val': []}
    for i in range(n_its):
        (x_train, y_train), (x_val, y_val) = load_mnist_v2()
        nn = Neural_net()
        loss_hist = nn.train(x_train, y_train, x_val, y_val, epochs=5, learning_rate=0.05)
        hists_05['train'].append(loss_hist['train'])
        hists_05['val'].append(loss_hist['val'])
        print()
     
    print('LR: ', 0.01)
    hists_01 = {'train': [], 'val': []}
    for i in range(n_its):
        (x_train, y_train), (x_val, y_val) = load_mnist_v2()
        nn = Neural_net()
        loss_hist = nn,traub(x_train, y_train, x_val, y_val, epochs=5, learning_rate=0.01)
        hists_01['train'].append(loss_hist['train'])
        hists_01['val'].append(loss_hist['val'])
        print()
    
    print('LR: ', 0.001)
    hists_001 = {'train': [], 'val': []}
    for i in range(n_its):
        (x_train, y_train), (x_val, y_val) = load_mnist_v2()
        nn = Neural_net()
        loss_hist = nn.train(x_train, y_train, x_val, y_val, epochs=5, learning_rate=0.001)
        hists_001['train'].append(loss_hist['train'])
        hists_001['val'].append(loss_hist['val'])
        print()
        
    return hists_05, hists_01, hists_001

In [None]:
hists_05, hists_01, hists_001 = q5_learning_rates()

In [None]:
mtr05 = np.asarray(hists_05['train']).mean(axis=0)
sdtr05 = np.asarray(hists_05['train']).std(axis=0)
mte05 = np.asarray(hists_05['val']).mean(axis=0)
sdte05 = np.asarray(hists_05['val']).std(axis=0)

mtr01 = np.asarray(hists_01['train']).mean(axis=0)
sdtr01 = np.asarray(hists_01['train']).std(axis=0)
mte01 = np.asarray(hists_01['val']).mean(axis=0)
sdte01 = np.asarray(hists_01['val']).std(axis=0)

mtr001 = np.asarray(hists_001['train']).mean(axis=0)
sdtr001 = np.asarray(hists_001['train']).std(axis=0)
mte001 = np.asarray(hists_001['val']).mean(axis=0)
sdte001 = np.asarray(hists_001['val']).std(axis=0)

In [None]:
t = np.arange(1, 6)
fig, (ax0, ax1, ax2) = plt.subplots(nrows=1, ncols=3, sharex=True, sharey=True, figsize=(12, 6))

ax0.plot(t, mtr05, lw=2, label='Train', color='blue')
ax0.plot(t, mte05, lw=2, label='Val', color='red')
ax0.fill_between(t, mtr05+sdtr05, mtr05-sdtr05, facecolor='blue', alpha=0.5)
ax0.fill_between(t, mte05+sdte05, mte05-sdte05, facecolor='red', alpha=0.5)
ax0.set_title('LR: 0.05')

ax1.plot(t, mtr01, lw=2, label='Train', color='blue')
ax1.plot(t, mte01, lw=2, label='Val', color='red')
ax1.fill_between(t, mtr01+sdtr01, mtr01-sdtr01, facecolor='blue', alpha=0.5)
ax1.fill_between(t, mte01+sdte01, mte01-sdte01, facecolor='red', alpha=0.5)
ax1.set_title(r'LR: 0.01')

ax2.plot(t, mtr001, lw=2, label='Train', color='blue')
ax2.plot(t, mte001, lw=2, label='Val', color='red')
ax2.fill_between(t, mtr001+sdtr001, mtr001-sdtr001, facecolor='blue', alpha=0.5)
ax2.fill_between(t, mte001+sdte001, mte001-sdte001, facecolor='red', alpha=0.5)
ax2.set_title(r'LR: 0.001')

ax2.legend(loc='upper right')
ax1.set_xlabel('Epochs')
ax0.set_ylabel('Cross Entropy Loss')
ax0.grid()
ax1.grid()
ax2.grid()

fig.suptitle('Effect of Learning Rate on Training and Validation Loss')
plt.savefig('Question 5')
plt.show()

# Q5-final

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score

(x_train, y_train), (x_test, y_test), input_shape = load_mnist(final=True)
x_train = x_train / 255
x_test = x_test / 255
    
nn = Neural_net()
training_loss = nn.train(x_train, y_train, x_test, y_test, epochs=5, learning_rate=0.01)

In [None]:
preds = nn.predict(x_test.T)

In [None]:
preds = np.asarray(pd.DataFrame(preds.T).idxmax(axis=1))   
print('Accuracy:',  accuracy_score(y_test, preds))
print('F1 Score:', f1_score(y_test, preds, average='macro'))
print('Confusion Matrix:', confusion_matrix(y_test, preds))