In [1]:
import numpy as np
import matplotlib.pyplot as plt
from utils import load_data, load_config, write_to_file, one_hot_encoding

# Load configuration
config = load_config('./config.yaml')

# Load the data and reshape from (32 x 32) to (1024 x 1)
x_train, y_train, x_test, y_test = load_data()

# One-hot encoding
y_train = np.eye(len(y_train), 10)[y_train]
y_test = np.eye(len(y_test), 10)[y_test]

x_train = np.array([image.reshape((1024)) for image in x_train], dtype='float')
x_test = np.array([image.reshape((1024)) for image in x_test], dtype='float')

# Create validation set out of training data.
num = int(len(x_train) * 0.8)
[x_train, x_val]= np.split(x_train, [num])
[y_train, y_val] = np.split(y_train, [num])

In [2]:
# Calculate feature mean and standard deviation for x_train, and use them to
# Z score x_train, X_val and X_test
def z_score_train_test(train, val, test):
    train_T = train.T
    val_T = val.T
    test_T = test.T
    for i in range(len(train_T)):
        mean = np.mean(train_T[i])
        SD = np.std(train_T[i])
        train_T[i] = (train_T[i] - mean) / SD
        val_T[i] = (val_T[i] - mean) / SD
        test_T[i] = (test_T[i] - mean) / SD
    return train_T.T, val_T.T, test_T.T

# Z-scoring
x_train, x_val, x_test = z_score_train_test(x_train, x_val, x_test)

In [3]:
train_acc = []
valid_acc = []
train_loss = []
valid_loss = []
best_model = None

In [4]:
################################################################################
# CSE 151B: Programming Assignment 2
# Code snippet by Ajit Kumar, Savyasachi
# Edited by Zihao Kong, Baichuan Wu
# Fall 2020
################################################################################

import numpy as np
import math

class Activation:
    """
    The class implements different types of activation functions for
    your neural network layers.

    Example (for sigmoid):
        //>>> sigmoid_layer = Activation("sigmoid")
        //>>> z = sigmoid_layer(a)
        //>>> gradient = sigmoid_layer.backward(delta=1.0)
    """

    def __init__(self, activation_type="sigmoid"):
        """
        Initialize activation type and placeholders here.
        """
        if activation_type not in ["sigmoid", "tanh", "ReLU"]:
            raise NotImplementedError("%s is not implemented." % (activation_type))

        # Type of non-linear activation.
        self.activation_type = activation_type
        # Placeholder for input. This will be used for computing gradients.
        self.x = None

    def __call__(self, a):
        """
        This method allows your instances to be callable.
        """
        return self.forward(a)

    def forward(self, a):
        """
        Compute the forward pass.
        """
        if self.activation_type == "sigmoid":
            return self.sigmoid(a)

        elif self.activation_type == "tanh":
            return self.tanh(a)

        elif self.activation_type == "ReLU":
            return self.ReLU(a)

    def backward(self, delta):
        """
        Compute the backward pass.
        """
        if self.activation_type == "sigmoid":
            grad = self.grad_sigmoid()

        elif self.activation_type == "tanh":
            grad = self.grad_tanh()

        elif self.activation_type == "ReLU":
            grad = self.grad_ReLU()

        # Hadamard product
        return grad * delta

    def sigmoid(self, x):
        """
        Sigmoid activation function.
        """
        self.x = x
        if x >= 0:
            return 1 / (1 + exp(-x))
        else:
            return exp(x) / (1 + exp(x))

    def tanh(self, x):
        """
        Tanh activation function.
        """
        self.x = x
        return np.tanh(x)

    def ReLU(self, x):
        """
        ReLU activation function.
        """
        self.x = x
        res = np.array(self.x)
        res[res < 0] = 0
        return res

    def grad_sigmoid(self):
        """
        Calculates Gradient of sigmoid activation function.
        """
        return self.sigmoid(self.x) * (1 - self.sigmoid(self.x))

    def grad_tanh(self):
        """
        Calculates Gradient of tanh activation function.
        """
        return 1 - self.tanh(self.x) ** 2

    def grad_ReLU(self):
        """
        Calculates Gradient of ReLU activation function.
        """
        res = np.array(self.x)
        res[res <= 0] = 0
        res[res > 0] = 1
        return res


class Layer:
    """
    This class implements Fully Connected layers for your neural network.

    Example:
        //>>> fully_connected_layer = Layer(1024, 100)
        //>>> output = fully_connected_layer(input)
        //>>> gradient = fully_connected_layer.backward(delta=1.0)
    """

    def __init__(self, in_units, out_units):
        """
        Define the architecture and create placeholder.
        """
        np.random.seed(42)
        self.w = math.sqrt(2 / in_units) * np.random.randn(in_units, out_units) # Kaiming initialization
        self.b = np.zeros((1, out_units))  # Create a placeholder for Bias
        self.x = None  # Save the input to forward in this
        self.a = None  # Save the output of forward pass in this (without activation)

        self.d_x = None  # Save the gradient w.r.t x in this w_{jk}
        self.d_w = None  # Save the gradient w.r.t w in this x_j
        self.d_b = None  # Save the gradient w.r.t b in this

    def __call__(self, x):
        """
        Make layer callable.
        """
        return self.forward(x)

    def forward(self, x):
        """
        Compute the forward pass through the layer here.
        Do not apply activation here.
        Return self.a
        """
        self.x = x
        self.a = self.x @ self.w + self.b
        return self.a

    def backward(self, delta):
        """
        Write the code for backward pass. This takes in gradient from its next layer as input,
        computes gradient for its weights and the delta to pass to its previous layers.
        Return self.dx
        """
        # \frac{\partial a_j}{\partial w_{ij}} = x, 
        # where the gradient is - \frac{\partial E}{\partial a_j} \frac{a_j}{\partial w_{ij}}
        self.d_w = -1 * (self.x.T @ delta)

        # derivative of bias is 1
        self.d_b = -1 * np.ones((1, len(self.x))) @ delta

        # derivative of input is the weighted sum of input of delta j and w_j
        # delta is row major, change to column major first
        self.d_x = (self.w @ delta.T).T

        # propogate partial X to calculate last layer's delta
        return self.d_x


class NeuralNetwork:
    """
    Create a Neural Network specified by the input configuration.

    Example:
        //>>> net = NeuralNetwork(config)
        //>>> output = net(input)
        //>>> net.backward()
    """

    def __init__(self, config):
        """
        Create the Neural Network using config.
        """
        self.layers = []  # Store all layers in this list.
        self.x = None  # Save the input to forward in this
        self.y = None  # Save the output vector of model in this
        self.targets = None  # Save the targets in forward in this variable
        self.alpha = config['learning_rate'] # Save the learning rate from config
        self.batch_size = config['batch_size'] #

        # Add layers specified by layer_specs.
        for i in range(len(config['layer_specs']) - 1):
            self.layers.append(Layer(config['layer_specs'][i], config['layer_specs'][i + 1]))
            if i < len(config['layer_specs']) - 2:
                self.layers.append(Activation(config['activation']))

    def __call__(self, x, targets=None):
        """
        Make NeuralNetwork callable.
        """
        return self.forward(x, targets)

    def forward(self, x, targets=None):
        """
        Compute forward pass through all the layers in the network and return it.
        If targets are provided, return loss as well.
        """
        self.x = x
        self.targets = targets
        
        temp = self.x
        
        for i in range(len(self.layers)):
            # Calculate weighted sum of inputs / pass weighted sum through activation
            temp = self.layers[i].forward(temp)

        # activate ak to yk
        self.y = self.softmax(temp)

        # calculate loss if target is passed into the function
        if targets is not None:
            batch_loss = self.loss(self.y, self.targets)
            return self.y, batch_loss
        else:
            return self.y


    def softmax(self, x):
        """
        Numerically stable softmax function
        """
        row_max = np.amax(x, axis=1)
        x = x - row_max.reshape(len(x), 1)

        return np.exp(x) / np.sum(np.exp(x), axis=1).reshape(-1, 1)

    def backward(self):
        """
        Implement backpropagation here.
        Call backward methods of individual layer's.
        """
        delta = [None] * len(self.layers)
        delta[len(delta) - 1] = self.targets - self.y

        # Backprop deltas
        for i in reversed(range(len(self.layers) - 1)):
            # Evaluate the delta term
            delta[i] = self.layers[i + 1].backward(delta[i + 1])
        self.layers[0].backward(delta[0])
        
        
        # Update weights
        for i in range(0, len(self.layers), 2):
            
            # w = [fan_in, fan_out] => x = [n, fan_in], delta = [n, fan_out], alpha = [1] => x.T @ delta * alpha
        
            self.layers[i].w = self.layers[i].w - 0.05 * self.layers[i].d_w / self.batch_size
            # a = w_0 * b + w_1 * x1 ...
            # b = [n, fan_out] => delta = [n, fan_out]
            
            self.layers[i].b = self.layers[i].b - 0.05 * self.layers[i].d_b

    def loss(self, logits, targets):
        """
        compute the categorical cross-entropy loss and return it.
        """
        y_ylog = targets * np.log(logits + 1e-8)
        return -1 * np.sum(y_ylog) / len(targets)



In [5]:
import copy
def accuracy(y, t):
    y = np.argmax(y, axis=1)
    t = np.argmax(t, axis=1)
    res = [y_hat == t_hat for y_hat, t_hat in zip(y, t)]
    return np.sum(res) / len(res)

best_model = None
best_loss = float('inf')
curr_loss = float('inf')
prev_loss = float("inf")
early_stop_mark = 0

batch_index = 0

model = NeuralNetwork(config=config)

for i in range(config['epochs']): 
    print("start epoch", i)
    batch_index = 0
    # Randomize the order of the indices into the training set
    shuffled_indices = np.random.permutation(len(x_train))
    x_train = x_train[shuffled_indices]
    y_train = y_train[shuffled_indices]
    for j in range(0, len(x_train), config['batch_size']):
        batch_index += 1

        if (j + config['batch_size'] < len(x_train)):
            batch_x = x_train[j:j + config['batch_size'],:]
            batch_y = y_train[j:j + config['batch_size'],:]
        else:
            batch_x = x_train[[j, len(x_train) - 1]]
            batch_y = y_train[[j, len(x_train) - 1]]
        
        
        model.forward(x=batch_x, targets=batch_y)
        model.backward()
                
    y, curr_loss = model.forward(x=x_val, targets=y_val)
    if curr_loss <= best_loss:
        print("best loss detected:", best_loss)
        print("accuracy: ",accuracy(y,y_val)*100,"%")
        
        best_loss = curr_loss
        best_model = copy.deepcopy(model)
         
    
    
        
    #if early stop is true
    if curr_loss >= prev_loss:
        print("increment counter")
        early_stop_mark += 1

    if early_stop_mark == config['early_stop_epoch']:
        print("Stop epoch \n\n\n")
        early_stop_mark = 0
        break
    prev_loss = curr_loss
    

    
    #y, loss = model.forward(x_val, y_val)
    #acc = accuracy(y, y_val)
    #print('Epoch', i, 'Loss', loss, 'Accuracy', acc)

start epoch 0
best loss detected: inf
accuracy:  51.999726999727 %
start epoch 1
best loss detected: 1.505270340943542
accuracy:  64.32568932568932 %
start epoch 2
best loss detected: 1.1767364672311835
accuracy:  68.61861861861863 %
start epoch 3
best loss detected: 1.0349868272537028
accuracy:  71.21894621894623 %
start epoch 4
best loss detected: 0.9510741566772374
accuracy:  71.86049686049685 %
start epoch 5
best loss detected: 0.9125375181304797
accuracy:  72.8023478023478 %
start epoch 6
best loss detected: 0.8824518216073237
accuracy:  74.69287469287468 %
start epoch 7
best loss detected: 0.8291013311087106
accuracy:  75.81217581217581 %
start epoch 8
best loss detected: 0.7911263474908584
accuracy:  76.48102648102648 %
start epoch 9
best loss detected: 0.7689896661076838
accuracy:  77.34780234780236 %
start epoch 10
best loss detected: 0.7476245722371461
accuracy:  77.87332787332787 %
start epoch 11
increment counter
start epoch 12
best loss detected: 0.7305109901473837
accurac

In [24]:
y, loss = model.forward(x=x_test, targets=y_test)
print(accuracy(y,y_test))


pri

0.7723955132145052


In [22]:
# numerical approximation

x = x_train[10:11,:]
y = y_train[10:11,:]


coorx = 0
coory = 8
layernum = 0
model = NeuralNetwork(config=config)
epsilon = 1e-2

model.layers[layernum].w[coorx][coory] = model.layers[layernum].w[coorx][coory] + epsilon
te, loss1 = model.forward(x,y)
model.layers[layernum].w[coorx][coory] = model.layers[layernum].w[coorx][coory] - 2 * epsilon
te, loss2 = model.forward(x,y)

print("gradient expected", "{:.2e}".format((loss1-loss2)/(2 * epsilon)))

model.layers[layernum].w[coorx][coory] = model.layers[2].w[coorx][coory] + epsilon
model.backward()

print("gradient received", "{:.2e}".format(model.layers[layernum].d_w[coorx][coory])) #.d_w[coorx][coory]
print("gradient diff", "{:.2e}".format(model.layers[layernum].d_w[coorx][coory]-(loss1-loss2)/(2 * epsilon)))


gradient expected -3.73e-02
gradient received -3.71e-02
gradient diff 2.30e-04


In [8]:
# SGD


In [9]:
# model = NeuralNetwork(config=config)

# # One output bias weight
# epsilon = 1e-2
# model.layers[2].b[0][8] += epsilon
# y, loss = model.forward(x_val, targets=y_val)
# e_w_plus = loss
# model.layers[2].b[0][8] -= epsilon
# y, loss = model.forward(x_val, targets=y_val)
# e_w_minus = loss
# e_w_diff = e_w_plus - e_w_minus
# expected_d_w = (e_w_diff / 2 * epsilon)
# model.backward()
# actual_d_w = model.layers[2].d_b[0][8]
# print('Expected', expected_d_w)
# print('Actual', actual_d_w)
# print('Agress?', np.absolute(expected_d_w - actual_d_w) < epsilon ** 2)