
## <font color=red> You should not import any new libraries. Your code should run with python=3.x</font>

#### <font color=red>For lab assignment, you will work with two datasets. The trained weights need to be saved and shared with us in a folder called models with the name ./models/{dataset_name}_weights.pkl. Your predict function should load these weights, initialize the DNN and predict the labels.</font>

- Your solutions will be auto-graded. Hence we request you to follow the instructions.
- Modify the code only between 
```
## TODO
## END TODO
```
- In addition to above changes, you can play with arguments to the functions for generating plots
- We will run the auto grading scripts with private test data

In [814]:
import numpy as np
from matplotlib import pyplot as plt
import math
import pickle as pkl

Preprocessing

In [815]:
def preprocessing(X):
    """
    Implement Normalization for input image features
    Args:
        X : numpy array of shape (n_samples, 784)
    Returns:
        X_out: numpy array of shape (n_samples, 784) after normalization
    """
    X_out = None
    ## TODO
    X_out = np.zeros(X.shape, dtype = np.float64)
    ind = (np.std(X, axis = 0)!=0)
    X_out[:, ind] = X[:, ind]
    X_out[:,ind] = (X_out[:, ind] - np.mean(X_out[:, ind], axis = 0))/(np.std(X_out[:, ind], axis = 0))
    X_out[:, ind] = (X_out[:, ind] - X_out[:, ind].min(axis = 0))/(
        X_out[:, ind].max(axis = 0) - X_out[:, ind].min(axis = 0))
    ## END TODO

    assert X_out.shape == X.shape

    return X_out

### Split data into train/val

In [816]:
def split_data(X, Y, train_ratio=0.8):
    '''
    Split data into train and validation sets
    The first floor(train_ratio*n_sample) samples form the train set
    and the remaining the test set

    Args:
    X - numpy array of shape (n_samples, n_features)
    Y - numpy array of shape (n_samples, 1)
    train_ratio - fraction of samples to be used as training data

    Returns:
    X_train, Y_train, X_val, Y_val
    '''
    # Try Normalization and scaling and store it in X_transformed
    X_transformed = X

    ## TODO
    X_transformed = preprocessing(X)
    ## END TODO

    assert X_transformed.shape == X.shape

    num_samples = len(X)
    indices = np.arange(num_samples)
    num_train_samples = math.floor(num_samples * train_ratio)
    train_indices = np.random.choice(indices, num_train_samples, replace=False)
    val_indices = list(set(indices) - set(train_indices))
    X_train, Y_train, X_val, Y_val = X_transformed[train_indices], Y[train_indices], X_transformed[val_indices], Y[val_indices]
  
    return X_train, Y_train, X_val, Y_val

#Flatten the input

In [817]:
class FlattenLayer:
    '''
    This class converts a multi-dimensional into 1-d vector
    '''
    def __init__(self, input_shape):
        '''
        Args:
        input_shape : Original shape, tuple of ints
        '''
        self.input_shape = input_shape
    
    def __str__(self):
        return "Flatten"

    def forward(self, input):
        '''
        Converts a multi-dimensional into 1-d vector
        Args:
          input : training data, numpy array of shape (n_samples , self.input_shape)

        Returns:
          input: training data, numpy array of shape (n_samples , -1)
        '''
        ## TODO
        n_samples = input.shape[0]
        return input.reshape(n_samples, -1)
        ## END TODO
        
    
    def backward(self, output_error, learning_rate):
        '''
        Converts back the passed array to original dimention 
        Args:
        output_error :  numpy array 
        learning_rate: float

        Returns:
        output_error: A reshaped numpy array to allow backward pass
        '''
        ## TODO
        return output_error.reshape(-1,self.input_shape[0], self.input_shape[1])
        ## END TODO
        

#Fully Connected Layer

In [818]:
class FCLayer:
    '''
    Implements a fully connected layer  
    '''
    def __init__(self, input_size, output_size):
        '''
        Args:
         input_size : Input shape, int
         output_size: Output shape, int 
        '''
        self.input_size = input_size
        self.output_size = output_size
        ## TODO
        np.random.seed(0)
        self.weights = np.random.random((input_size,output_size)) * 2 + -1
        self.bias = np.random.random((1, output_size)) *2 + -1
        self.weights/= input_size
        self.bias/= input_size
        ## END TODO
    
    def __str__(self):
        return "Fully connected"

    def forward(self, input):
        '''
        Performs a forward pass of a fully connected network
        Args:
          input : training data, numpy array of shape (n_samples , self.input_size)

        Returns:
           numpy array of shape (n_samples , self.output_size)
        '''
        ## TODO
        self.input = input
        return np.dot(input, self.weights) + self.bias
        ## END TODO
        

    def backward(self, output_error, learning_rate):
        '''
        Performs a backward pass of a fully connected network along with updating the parameter 
        Args:
          output_error :  numpy array 
          learning_rate: float

        Returns:
          Numpy array resulting from the backward pass
        '''
        ## TODO
        w_grad = np.dot(np.transpose(self.input), output_error)
        b_grad = output_error.sum(axis = 0)
#         print(output_error.shape)
        out_grad = np.dot(output_error, np.transpose(self.weights))
        self.weights -= learning_rate * w_grad
        self.bias -= learning_rate * b_grad
        return out_grad
        ## END TODO

In [819]:
class ActivationLayer:
    '''
    Implements a Activation layer which applies activation function on the inputs. 
    '''
    def __init__(self, activation, activation_prime):
        '''
          Args:
          activation : Name of the activation function (sigmoid,tanh or relu)
          activation_prime: Name of the corresponding function to be used during backpropagation (sigmoid_prime,tanh_prime or relu_prime)
        '''
        self.activation = activation
        self.activation_prime = activation_prime
    
    def __str__(self):
        return "Activation"
    
    def forward(self, input):
        '''
        Applies the activation function 
        Args:
          input : numpy array on which activation function is to be applied

        Returns:
           numpy array output from the activation function
        '''
        ## TODO
        self.output = self.activation(input)
        return self.output
        ## END TODO
        

    def backward(self, output_error, learning_rate):
        '''
        Performs a backward pass of a fully connected network along with updating the parameter 
        Args:
          output_error :  numpy array 
          learning_rate: float

        Returns:
          Numpy array resulting from the backward pass
        '''
        ## TODO
        return output_error * self.activation_prime(self.output)
        ## END TODO

In [820]:

class SoftmaxLayer:
    '''
      Implements a Softmax layer which applies softmax function on the inputs. 
    '''
    def __init__(self, input_size):
        self.input_size = input_size
    
    def __str__(self):
        return "Softmax"
    
    def forward(self, input):
        '''
        Applies the softmax function 
        Args:
          input : numpy array on which softmax function is to be applied

        Returns:
           numpy array output from the softmax function
        '''
        ## TODO
#         try:
        temp = np.exp(input)
#         except:
#         print(input.max(), input.min())
        self.output = temp/(np.sum(temp, axis = 1).reshape(-1,1))
        return self.output
        ## END TODO
        
    def backward(self, output_error, learning_rate):
        '''
        Performs a backward pass of a Softmax layer
        Args:
          output_error :  numpy array 
          learning_rate: float

        Returns:
          Numpy array resulting from the backward pass
        '''
        ## TODO
        temp = output_error.copy()
        temp += (-1 * output_error * self.output).sum(axis = 1).reshape(-1,1)
        answer = self.output * temp
        return answer
        ## END TODO

In [821]:
def sigmoid(x):
    '''
    Sigmoid function 
    Args:
        x :  numpy array 
    Returns:
        Numpy array after applying simoid function
    '''
    ## TODO
    return 1/(1 + np.exp(-x))
    ## END TODO

def sigmoid_prime(x):
    '''
     Implements derivative of Sigmoid function, for the backward pass
    Args:
        x :  numpy array 
    Returns:
        Numpy array after applying derivative of Sigmoid function
    '''
    ## TODO
    temp = sigmoid(x)
    return temp * (1 - temp)
    ## END TODO

def tanh(x):
    '''
    Tanh function 
    Args:
        x :  numpy array 
    Returns:
        Numpy array after applying tanh function
    '''
    ## TODO
    temp_exp = np.exp(2 * x)
    return (temp_exp - 1)/(temp_exp + 1)
    ## END TODO

def tanh_prime(x):
    '''
     Implements derivative of Tanh function, for the backward pass
    Args:
        x :  numpy array 
    Returns:
        Numpy array after applying derivative of Tanh function
    '''
    ## TODO
    return 1 - (tanh(x)**2)
    ## END TODO

def relu(x):
    '''
    ReLU function 
    Args:
        x :  numpy array 
    Returns:
        Numpy array after applying ReLU function
    '''
    ## TODO
    temp = x.copy()
    temp[temp<0] = 0
    return temp
    ## END TODO

def relu_prime(x):
    '''
     Implements derivative of ReLU function, for the backward pass
    Args:
        x :  numpy array 
    Returns:
        Numpy array after applying derivative of ReLU function
    '''
    ## TODO
    temp = np.zeros(x.shape, dtype = np.float64)
    temp[x>0] = 1
    return temp
    ## END TODO

In [822]:
def mse(y_true, y_pred):
    '''
    MSE loss
    Args:
        y_true :  Ground truth labels, numpy array 
        y_true :  Predicted labels, numpy array 
    Returns:
       loss : float
    '''
    ## TODO
    num_samples = y_true.shape[0]
    mean_err = np.mean((1 - y_pred[np.arange(num_samples), y_true])**2)
    return mean_err
    ## END TODO

def mse_prime(y_true, y_pred):
    '''
    Implements derivative of MSE function, for the backward pass
    Args:
        x :  numpy array 
    Returns:
        Numpy array after applying derivative of MSE function
    '''
    ## TODO
    grad = np.zeros(y_pred.shape)
    num_samples = y_true.shape[0]
    grad[np.arange(num_samples), y_true] = 2 * (y_pred[np.arange(num_samples), y_true] - 1)
    return grad
    ## END TODO

def cross_entropy(y_true, y_pred):
    '''
    Cross entropy loss 
    Args:
        y_true :  Ground truth labels, numpy array 
        y_true :  Predicted labels, numpy array 
    Returns:
       loss : float
    '''
    ## TODO
    num_samples = y_true.shape[0]
    vec = np.log(y_pred[np.arange(num_samples), y_true])
    return np.mean(vec) * -1
    ## END TODO

def cross_entropy_prime(y_true, y_pred):
    '''
    Implements derivative of cross entropy function, for the backward pass
    Args:
        x :  numpy array 
    Returns:
        Numpy array after applying derivative of cross entropy function
    '''
    ## TODO
    grad = np.zeros(y_pred.shape)
    num_samples = y_true.shape[0]
    grad[np.arange(num_samples), y_true] = -1/(y_pred[np.arange(num_samples), y_true])
    return grad
    ## END TODO

Fit function

In [823]:
def fit(X_train, Y_train,dataset_name):

    '''
    Create and trains a feedforward network

    Do not forget to save the final weights of the feed forward network to a file. Use these weights in the `predict` function 
    Args:
        X_train -- np array of share (num_test, 2048) for flowers and (num_test, 28, 28) for mnist.
        Y_train -- np array of share (num_test, 2048) for flowers and (num_test, 28, 28) for mnist.
        dataset_name -- name of the dataset (flowers or mnist)
    
    '''
     
    #Note that this just a template to help you create your own feed forward network 
    ## TODO

    #define your network
    #This network would work only for mnist
    if(dataset_name == "mnist"):
        epochs = 30
        network = [
            FlattenLayer(input_shape=(28, 28)),
            FCLayer(28 * 28, 30),
            ActivationLayer(relu, relu_prime),
            FCLayer(30, 15),
            ActivationLayer(relu, relu_prime),
            FCLayer(15, 10),
            SoftmaxLayer(10)
        ]
        train_X, train_Y, val_X, val_Y = split_data(X_train, Y_train, 0.75)
        n_samples = train_X.shape[0]
        batch_size = n_samples//50
        learning_rate = 0.99/batch_size
        
        
    else:
        epochs = 20
        train_X, train_Y, val_X, val_Y = split_data(X_train, Y_train, 0.8)
        network = [
            FCLayer(2048, 5),
            ActivationLayer(relu, relu_prime),
            FCLayer(5, 5),
            SoftmaxLayer(5)
        ]
        
        n_samples = train_X.shape[0]
        batch_size = 50
        learning_rate = 0.1/batch_size
    
    
    
    
    for epoch in range(epochs):
        error = 0
        for start_pt in range(0, n_samples, batch_size):
            end_pt = min(start_pt + batch_size , n_samples)
            x = train_X[start_pt:end_pt, :]
            y_true = train_Y[start_pt:end_pt]
            # forward
            output = x
            for layer in network:
                output = layer.forward(output)
            
            # error (display purpose only)
            err =  cross_entropy(y_true, output)
            error += err * (end_pt - start_pt)

            # backward
            output_error = cross_entropy_prime(y_true, output)
            for layer in reversed(network):
                output_error = layer.backward(output_error, learning_rate)
        
        error /= len(train_X)
        print('%d/%d, error=%f' % (epoch + 1, epochs, error))
        output_train = train_X
        output_val = val_X
        for layer in network:
            output_train = layer.forward(output_train)
            output_val = layer.forward(output_val)
        train_pred = output_train.argmax(axis = 1)
        val_pred = output_val.argmax(axis = 1)
        accuracy_train = np.sum(train_pred == train_Y)/ len(output_train)
        accuracy_val = np.sum(val_pred == val_Y)/len(output_val)
        print(f"Train accuracy = {accuracy_train}, Val_accuracy = {accuracy_val} for dataset = {dataset_name}")
        

    #Save you model weights
#     if dataset_name == "mnist":
    with open(f"./models/{dataset_name}_weights.pkl", "wb") as file:
        pkl.dump(network , file)
    output_train = train_X
    output_val = val_X
    for layer in network:
        output_train = layer.forward(output_train)
        output_val = layer.forward(output_val)
    train_pred = output_train.argmax(axis = 1)
    val_pred = output_val.argmax(axis = 1)
    accuracy_train = np.sum(train_pred == train_Y)/ len(output_train)
    accuracy_val = np.sum(val_pred == val_Y)/len(output_val)
    print(f"Train accuracy = {accuracy_train}, Val_accuracy = {accuracy_val} for dataset = {dataset_name}")
    ## END TODO


Loading datasets

In [824]:
dataset = "mnist" 
with open(f"./data/{dataset}_train.pkl", "rb") as file:
    train_mnist = pkl.load(file)
    print(f"train_x -- {train_mnist[0].shape}; train_y -- {train_mnist[1].shape}")

fit(train_mnist[0],train_mnist[1],'mnist')

dataset = "flowers" # "mnist"/"flowers"
with open(f"./data/{dataset}_train.pkl", "rb") as file:
    train_flowers = pkl.load(file)
    print(f"train_x -- {train_flowers[0].shape}; train_y -- {train_flowers[1].shape}")

fit(train_flowers[0],train_flowers[1],'flowers')

train_x -- (60000, 28, 28); train_y -- (60000,)
1/30, error=2.278943
Train accuracy = 0.3237111111111111, Val_accuracy = 0.3244 for dataset = mnist
2/30, error=1.438012
Train accuracy = 0.6466, Val_accuracy = 0.6456 for dataset = mnist
3/30, error=0.686754
Train accuracy = 0.8760222222222223, Val_accuracy = 0.8757333333333334 for dataset = mnist
4/30, error=0.444628
Train accuracy = 0.898, Val_accuracy = 0.8922666666666667 for dataset = mnist
5/30, error=0.322593
Train accuracy = 0.9259111111111111, Val_accuracy = 0.9221333333333334 for dataset = mnist
6/30, error=0.265115
Train accuracy = 0.9295333333333333, Val_accuracy = 0.9251333333333334 for dataset = mnist
7/30, error=0.231877
Train accuracy = 0.9327111111111112, Val_accuracy = 0.9264666666666667 for dataset = mnist
8/30, error=0.408585
Train accuracy = 0.9318222222222222, Val_accuracy = 0.9278 for dataset = mnist
9/30, error=0.219991
Train accuracy = 0.9374222222222223, Val_accuracy = 0.9326666666666666 for dataset = mnist
10/30

In [825]:
def predict(X_test, dataset_name):
    """

    X_test -- np array of share (num_test, 2048) for flowers and (num_test, 28, 28) for mnist.



    This is the only function that we will call from the auto grader. 

    This function should only perform inference, please donot train your models here.

    Steps to be done here:
    1. Load your trained weights from ./models/{dataset_name}_weights.pkl
    2. Ensure that you read weights using only the libraries we have given above.
    3. Initialize your model with your trained weights
    4. Compute the predicted labels and return it

    Please provide us the complete code you used for training including any techniques
    like data augmentation etc. that you have tried out. 

    Return:
    Y_test - nparray of shape (num_test,)
    """
    Y_test = np.zeros(X_test.shape,)

    ## TODO
    X_test = preprocessing(X_test)
    with open(f"./models/{dataset_name}_weights.pkl", "rb") as file:
        network = pkl.load(file)
        output = X_test
        for layer in network:
            output = layer.forward(output)
        Y_test = np.argmax(output, axis = 1)
#         print(Y_test.shape, X_test.shape)
#         print(type(Y_test), type(X_test))

    ## END TODO
    assert Y_test.shape == (X_test.shape[0],) and type(Y_test) == type(X_test), "Check what you return"
    return Y_test


In [826]:
#check the predict function
# for dataset in ["flowers","mnist"]:
#     accuracy = 0
#     min_accuracy = 1
#     max_accuracy = 0
#     num_exps = 100
#     with open(f"./data/{dataset}_train.pkl", "rb") as file:
#         train_mnist = pkl.load(file)
#         X = train_mnist[0]
#         Y = train_mnist[1]
#     num_samples = len(X)
#     sample_size = 800 if dataset == "flowers" else 4000
#     for _ in range(num_exps):
#         indices = np.random.randint(sample_size, size = sample_size)
#         Y_pred = predict(X[indices,:], dataset)
#         correct_count = np.sum(Y_pred == Y[indices])
#         min_accuracy = min(min_accuracy, correct_count/sample_size)
#         max_accuracy = max(max_accuracy, correct_count/sample_size)
#         accuracy+=correct_count
#     accuracy/= num_exps*sample_size
#     print(f"Accuracy = {accuracy}, Max = {max_accuracy}, Min = {min_accuracy}, dataset = {dataset}")
        

Accuracy = 0.9235875, Max = 0.9475, Min = 0.89, dataset = flowers
Accuracy = 0.9726725, Max = 0.9785, Min = 0.9665, dataset = mnist


In [827]:
# with open("./data/my_test_x", "rb") as file:
#     test_x = pkl.load(file)
# with open("./data/my_test_y", "rb") as file:
#     test_y = pkl.load(file)

# y_pred = predict(test_x, "mnist")
# print(np.sum(y_pred == test_y)/len(test_y))

0.9538
