In [1]:
from utils import CIFAR10Data, montage
import numpy as np
import scipy
import pickle
import matplotlib.pyplot as plt

## Two Layer Classifier

In [None]:
class OneLayerClassifer:
    def __init__(self, n_classes, input_dim, n_hidden, batch_size=100, eta=0.001, n_epochs=20, lamda=0):
        self.batch_size = batch_size # number of images in a batch
        self.eta = eta
        self.n_epochs = n_epochs
        self.lamda = lamda
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        
        # initialise weight matrix and bias
        self.W1 = np.random.normal(loc=0, scale=1/np.sqrt(input_dim), size=(n_hidden, input_dim))  # m x d
        self.W2 = np.random.normal(loc=0, scale=1/np.sqrt(n_hidden), size=(n_classes, n_hidden)) # K x m
        self.b1 = np.zeros((n_hidden,1))
        self.b2 = np.zeros((n_classes,1))
        
        

    def normalise(self, train_X, val_X, test_X):
        ''' X has shape (d,n) where d = dimensionality of each image, n is number of images '''
        mean = np.mean(train_X, axis=1)
        std = np.std(train_X, axis=1)
        original_shape = train_X.shape
        # apply same transformation to all of the datasets using params from train set
        def _normalise_helper(a, m, s):
            return ((a.T - m.T) / s.T).T
        
        train_X = _normalise_helper(train_X, mean, std)
        val_X = _normalise_helper(val_X, mean, std)
        test_X = _normalise_helper(test_X, mean, std)
        return train_X, val_X, test_X

    
    def forward_pass(self, X, W1, W2, b1, b2):
        S1 = np.dot(W1, X) + b1
        H = np.where(S1 >= 0, s1, 0) # apply RELU activation
        S = np.dot(W2,H) + b2
        P = self.softmax(S) # probabiliites
        predictions = np.argmax(P, axis=0)
        return H, P
    

    def softmax(self, x):
#         return np.exp(x) / np.sum(np.exp(x), axis=0)
        # deal with overflow problem
        return np.exp(x - np.max(x, axis=0)) / np.exp(x - np.max(x, axis=0)).sum(axis=0)
    
    
    def evaluate_classifier(self, X, W1, W2, b1, b2):
        H, P = self.forward_pass(X, W1, W2, b1, b2)
        predictions = np.argmax(P, axis=0)
        return predictions

    def compute_accuracy(self, X, Y):
        ''' X is data (dim, N), y is gt (C, N), W is weight matrix, b is bias, Y is 1hot encoded labels'''
        pred = self.evaluate_classifier(X, self.W1, self.W2, self.b1, self.b2)
        lbls = np.argmax(Y, axis=0)
        accuracy = np.mean(pred == lbls)
        return pred, accuracy
    

    def compute_cost(self, X, Y, W1, W2, b1, b2):
        ''' 
            X: dxn (dimensionality by # images)
            Y: Kxn (no. classes one-hot encoded by # images)
            J: scalar corresponding to sum of loss of ntwks predictions of X relative to gt labels 
        '''
        P, _ = self.evaluate_classifier(X, W1, W2, b1, b2)
        N = X.shape[1]
        #  loss function + regularisation term
#             loss = -np.trace(Y*np.log(P)) / N
        lcross = -np.sum(Y*np.log(P)) / N
        J = lcross + self.lamda * (np.sum(W1**2) + np.sum(W2**2))

        return J
        

    
    def compute_gradients(self, X, Y, W1, W2, b1, b2):
        ''' computes gradients of the cost function wrt W and b for batch X '''
        N = X.shape[1]

        # forward pass
        H, P = self.evaluate_classifier(X, W1, W2, b1, b2)
        
        # backward pass
        G = -(Y - P)
       
    
        # J = L(D,W,b) + lamda|W|^2
        # dJ/dW = dL/dW + 2 lambda |W|    
#         grad_W = (np.dot(G, X.T) /  N) + 2 * self.lamda * W    
#         grad_b = np.sum(G, axis=1) / N
#         grad_b = grad_b.reshape((grad_b.shape[0],1))
        
        
        grad_W2 =  np.dot(G, H.T) / N + 2 * self.lamda * W2 
        grad_b2 =  np.sum(G, axis=1) / N
        grad_b2 = grad_b2.reshape((grad_b2.shape[0],1))
        # propagate gradient back through second layer
        G = np.dot(W2.T, G)
        Ind = H > 0
        G = G * H # check this does what you want
        grad_W1 =  np.dot(G, X.T) / N + 2 * self.lamda * W1 
        grad_b1 =  np.sum(G, axis=1) / N
        grad_b1 = grad_b1.reshape((grad_b1.shape[0],1))


        np.sum(G, axis=1) / N
        
        return grad_W1, grad_W2, grad_b1, grad_b2
    
    
    def train(self, X, Y, random_shuffle=False, val_X=None, val_Y=None, get_accuracies_costs=False, epoch_jump=1):
        n = X.shape[1]
        number_of_batches = int(n / self.batch_size)
        indices = np.arange(X.shape[1])
        if random_shuffle:
            print('Randomly shuffling')
        print('Loss function', self.loss)
        
        accuracies = {'train': [], 'val': []}
        costs = {'train': [], 'val': []}
        
        for epoch in range(self.n_epochs):
#             if (epoch % 10) == 0:
#                 print('epoch', epoch)
            if random_shuffle:
                np.random.shuffle(indices)
                X = np.take(X, indices, axis=1)
                Y = np.take(Y, indices, axis=1)
                
            for j in range(number_of_batches):
                j_start = j * self.batch_size
                j_end = (j+1) * self.batch_size
                Xbatch = X[:, j_start:j_end]
                Ybatch = Y[:, j_start:j_end]
    
                # Perform MiniBatch Gradient Descent
                grad_W1, grad_W2, grad_b1, grad_b2 = self.compute_gradients(Xbatch, Ybatch, self.W1, self.W2, self.b1, self.b2)
                self.W1 -= self.eta * grad_W1
                self.W2 -= self.eta * grad_W2
                self.b1 -= self.eta *  grad_b1
                self.b2 -= self.eta *  grad_b2

            if get_accuracies_costs and epoch % epoch_jump == 0 or (epoch == self.n_epochs-1):
                _, train_accuracy = self.compute_accuracy(X, Y)
                _, val_accuracy = self.compute_accuracy(val_X, val_Y)
                accuracies['train'].append(train_accuracy)
                accuracies['val'].append(val_accuracy)
                train_cost = self.compute_cost(X, Y, self.W, self.b)
                val_cost = self.compute_cost(val_X, val_Y, self.W, self.b)
                costs['train'].append(train_cost)
                costs['val'].append(val_cost)
                
        return accuracies, costs
 

In [3]:
# load in data and normalise
CIFARDATA = CIFAR10Data(dataset_dir='../datasets/cifar-10-batches-py/')
train_X, train_Y = CIFARDATA.load_batch('data_batch_1')
val_X, val_Y = CIFARDATA.load_batch('data_batch_2')
test_X, test_Y = CIFARDATA.load_batch('test_batch')
# datasets = [train_X, train_Y, val_X, val_Y, test_X , test_Y]

In [None]:
clf = OneLayerClassifer(n_classes, input_dim, params['batch_size'], params['eta'], params['n_epochs'], params['lamda'], loss=loss, delta=delta)
train_X, val_X, test_X = clf.normalise(train_X, val_X, test_X)

