# Testing ReLU Networks

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

In [2]:
def get_data(x_path, y_path):
    x = np.load(x_path)
    y = np.load(y_path)

    y = y.astype('float')
    x = x.astype('float')

    #normalize x:
    x = 2*(0.5 - x/255)
    return x, y

In [3]:
X_train, y_train = get_data('../part_b/x_train.npy', '../part_b/y_train.npy')
X_test, y_test = get_data('../part_b/data_test/x_test.npy', '../part_b/data_test/y_test.npy')

In [4]:
from sklearn.metrics import classification_report

def get_metric(y_true, y_pred):
    results = classification_report(y_pred, y_true)
    print(results)

from sklearn.preprocessing import OneHotEncoder

# enc = OneHotEncoder()
label_encoder = OneHotEncoder(sparse_output=False)
label_encoder.fit(np.expand_dims(y_train, axis = -1))
y_train_oh = label_encoder.transform(np.expand_dims(y_train, axis = -1))
y_test_oh = label_encoder.transform(np.expand_dims(y_test, axis = -1))

In [5]:
def get_f1score(y_true, y_pred):
    f1_scores_dict = [v for v in classification_report(y_true, y_pred, output_dict=True).values()]
    f1_score = sum([label['f1-score'] for label in f1_scores_dict[:5]])/5

    return f1_score

In [6]:
def softmax(x):
    exp_x = np.exp(x)
    return exp_x / np.sum(exp_x, axis=1, keepdims=True)

def softmax_grad(softmax):
    s = softmax.reshape(-1,1)
    return np.diagflat(s) - s.T@s

def sigmoid(x):
    return 1/(1+np.exp(-x))

def sigmoid_grad(x):
    return sigmoid(x)*(1-sigmoid(x))

def cross_entropy_loss(p, q, eps=1e-12):
    m = p.shape[0]
    loss = -np.sum(p * np.log(q + eps)) / m
    return loss

def cross_entropy_grad_netk(p, q):
    # because p[k] = 1 iff k is true label
    return q - p

In [15]:
def relu(x, clip=100, downscale=1):
    return np.where(x>0, np.where(x>clip, clip, x)*downscale, 0)

def relu_grad(x, clip=100, downscale=1):
    # subderivative at x=0 is set to 1(right limit)
    # subderivative at x=clip is set to 0(right limit)
    return np.where(x>=0, np.where(x>=clip, 0, downscale), 0)

def leaky_relu(x, clip=100, r=0.01):
    return np.where(x>0, np.where(x>clip, clip+(x-clip)*r, x), x*r)

def leaky_relu_grad(x, clip=100, r=0.01):
    return np.where(x>0, np.where(x>clip, r, 1), r)

In [26]:
class ANN:
    def __init__(self, n: int, hidden_size: list, r: int, actv_f=sigmoid, actv_f_grad=sigmoid_grad):
        self.input_size = n
        self.hidden_size = hidden_size
        self.output_size = r

        self.activator = actv_f
        self.activator_grad = actv_f_grad

        # list of inter-layer weights : 
        std_init = np.sqrt(2/(n))
        self.weights = [np.random.normal(0, std_init, size=(n, hidden_size[0]))]
        # self.weights = [np.random.randn(n, hidden_size[0])]   # init=randn

        # list of inter-layer biases : init=0
        self.biases = [np.zeros((1,hidden_size[0]))]

        for i in range(1,len(hidden_size)):
            std_init = np.sqrt(2/(hidden_size[i-1]))
            self.weights.append(np.random.normal(0, std_init, size=(hidden_size[i-1], hidden_size[i])))
            self.biases.append(np.zeros((1, hidden_size[i])))

        std_init = np.sqrt(2/(hidden_size[-1]))
        self.weights.append(np.random.normal(0, std_init, size=(hidden_size[-1], r)))

        self.biases.append(np.zeros((1, r)))

        self.num_layers = len(self.biases)


    def fwd_prop(self, X):
        # list of layer outputs
        self.z = []
        self.actv = []

        A = X
        
        self.actv.append(A)
        self.z.append(np.eye(A.shape[0], A.shape[1]))        # to maintain same length

        for i in range(self.num_layers):
            Z = A@self.weights[i] + self.biases[i]
            self.z.append(Z)

            if i == self.num_layers-1:
                z_modifive = Z - np.max(Z, axis=1, keepdims=True)
                A = softmax(z_modifive)
                # A = softmax(Z)
            else:
                A = self.activator(Z)

            self.actv.append(A)

        # len(self.actv) == self.num_layers + 1 == len(self.weights) + 1
        return A


    def bwd_prop(self, y_true, lr):
        m = y_true.shape[0]
        y_pred = self.actv[-1]

        self.w_grad = []
        self.b_grad = []

        dZ = cross_entropy_grad_netk(y_true, y_pred)
        # dActv = softmax_grad(self.z[-1]).mean(axis=0)

        # print(dActv.shape)
        # print(dZ.shape)

        dB = np.mean(dZ, axis=0, keepdims=True)
        self.biases[-1] -= lr*dB
        self.b_grad.append(dB)

        dW = self.actv[-2].T@dZ/m
            # T: gradient clip
        dW = np.where(dW<1, np.where(dW<-1, -1, dW), dW)

        # print(dZ.shape)
        # print(dW.shape)

        self.weights[-1] -= lr*dW
        self.w_grad.append(dW)

        for l in range(self.num_layers-2,-1,-1):
            dJd = dZ@self.weights[l+1].T
            dActv = self.activator_grad(self.z[l+1]).mean(axis=0)
    
            dZ = dJd*dActv
            dW = self.actv[l].T@dZ/m

            # T: gradient clip
            dW = np.where(dW<1, np.where(dW<-1, -1, dW), dW)
            self.w_grad.append(dW)
            
            dB = np.mean(dZ, axis=0, keepdims=True)
            self.b_grad.append(dB)

            self.weights[l] -= lr*dW
            self.biases[l] -= lr*dB

        self.w_grad.reverse()
        self.b_grad.reverse()

        return self.w_grad, self.b_grad

    def train(self, X, y_true, epochs=100, tol=1e-4, batch_size=32, lr=0.01, adaptive=False):
        m = X.shape[0]
        num_batches = m//batch_size

        loss = cross_entropy_loss(y_true, self.fwd_prop(X))
        print(f"Epoch 0/{epochs} - Loss: {loss:.4f}")
        
        for epoch in range(epochs):
            loss_old = loss

            for i in range(num_batches):
                start = i*batch_size
                end = start+batch_size

                X_batch = X[start:end]
                y_batch = y_true[start:end]

                y_pred = self.fwd_prop(X_batch)
                if adaptive:
                    lr_adapt = lr/np.sqrt(epoch+1)
                    self.bwd_prop(y_batch, lr_adapt)
                else:
                    self.bwd_prop(y_batch, lr)

            y_pred = self.fwd_prop(X)
            loss = cross_entropy_loss(y_true, y_pred)            

            if (epoch+1)%10 == 0:
                print(f"Epoch {epoch+1}/{epochs} - Loss: {loss:.4f}")

            if abs(loss_old-loss) < tol:
                print(f'Within tolerance = {tol}')
                return


    def predict(self, X, out_matrix=False):
        y_pred = self.fwd_prop(X)
        if out_matrix:
            return y_pred

        return np.argmax(y_pred, axis=1)+1

In [27]:
hidden_size_list = [[512],[512,256],[512,256,128],[512,256,128,64]]

train_f1_score_list_relu = []
test_f1_score_list_relu = []
for hidden_size in hidden_size_list:
    t = time.time()
    nn_relu = ANN(1024, hidden_size, 5, actv_f=relu, actv_f_grad=relu_grad)

    # try more epochs for larger net?
    nn_relu.train(X_train, y_train_oh, epochs=500, tol=0, lr=0.1, adaptive=True)

    print(f"For hidden_size: {hidden_size}:")

    print("Train scores:-")
    y_pred_relu = nn_relu.predict(X_train)
    get_metric(y_train, y_pred_relu)

    f1_score = get_f1score(y_train, y_pred_relu)
    train_f1_score_list_relu.append(f1_score)

    print("Test scores:-")
    y_pred_relu = nn_relu.predict(X_test)
    get_metric(y_test, y_pred_relu)

    f1_score = get_f1score(y_test, y_pred_relu)
    test_f1_score_list_relu.append(f1_score)

    print(f"Time taken for {hidden_size}= {time.time()-t}")

Epoch 0/500 - Loss: 2.0256
Epoch 10/500 - Loss: 22.1656
Epoch 20/500 - Loss: 22.2374
Epoch 30/500 - Loss: 22.2374


KeyboardInterrupt: 