# Intro to Deep Learning Systems : HW 2
## Problem 1: Perceptron

In [1]:
# Imports
import numpy as np
from numpy.random import default_rng
import pandas as pd

In [2]:
# Generate training data
def generate_data(n):
    x = default_rng().uniform(low=0.0, high=1.0, size=(n, 2))
    func = lambda x: 1 if x else -1
    y = np.vectorize(func)(x[:, 0] > x[:, 1])
    
    return x, y
X_train, y_train = generate_data(10)
print("Training Data")
np.concatenate((X_train, y_train.reshape(-1, 1)), axis=1)

Training Data


array([[ 0.07913953,  0.27972466, -1.        ],
       [ 0.64158628,  0.73117014, -1.        ],
       [ 0.69398778,  0.47006412,  1.        ],
       [ 0.0713409 ,  0.0560696 ,  1.        ],
       [ 0.14439238,  0.15546489, -1.        ],
       [ 0.90697584,  0.88839703,  1.        ],
       [ 0.24572304,  0.49236102, -1.        ],
       [ 0.11568971,  0.88261834, -1.        ],
       [ 0.36278988,  0.63100913, -1.        ],
       [ 0.78673499,  0.81975189, -1.        ]])

In [3]:
def train_perceptron(X_train, y_train, epoch_lim=1000, hinge_loss=False):
    """Train weights for given data via perceptron algorithm.

    Args:
        X_train: Feature matrix
        y_train: Class labels
        epoch_lim: Upper epoch limit to cut-off training
        hinge_loss: Specify whether to use hinge loss

    Returns:
        Trained weights
    """
    w = np.zeros(X_train.shape[1])
    epoch, convergence = 0, False
    threshold = 1 if hinge_loss else 0

    while not convergence and epoch < epoch_lim:
        convergence = True
        for i in range(len(X_train)):
            x = X_train[i]
            y = y_train[i]

            margin = y * np.dot(w, x)
            if margin <= threshold:
                convergence = False
                w += y * x

        epoch += 1
    
    if not convergence:
        print("Convergence not achieved. Too many epochs.")
    return w

def predict_perceptron(x, weights):
    """Predict values using perceptron weights

    Args:
        x: Input features
        weights: Perceptron weights

    Return:
        Output vector
    """

    return np.sign(np.dot(x, weights))

In [4]:
# Training
w_perceptron = train_perceptron(X_train, y_train)
w_hinge = train_perceptron(X_train, y_train, hinge_loss=True)

# Test
perceptron_accuracy, hinge_accuracy = [], []
for i in range(10):
    X_test, y_test = generate_data(5000)
    pred_perceptron = predict_perceptron(X_test, w_perceptron)
    pred_hinge = predict_perceptron(X_test, w_hinge)
    
    perceptron_accuracy.append(sum(pred_perceptron == y_test) / pred_perceptron.shape[0])
    hinge_accuracy.append(sum(pred_hinge == y_test) / pred_hinge.shape[0])

print(f"Mean accuracy with perceptron loss: {np.mean(perceptron_accuracy)}")
print(f"Mean accuracy with hinge loss: {np.mean(hinge_accuracy)}")

Convergence not achieved. Too many epochs.
Mean accuracy with perceptron loss: 0.9911999999999999
Mean accuracy with hinge loss: 0.99624


## Problem 2: Weight Initialization, Dead Neurons, Leaky ReLU

In [5]:
# from deepreplay.callbacks import ReplayData
# from deepreplay.replay import Replay
# from deepreplay.plot import compose_plots
# from keras.initializers import normal
# from matplotlib import pyplot as plt
# from q2.model_builder import *

In [6]:
# from deepreplay.datasets.ball import load_data

# X, y = load_data(n_dims=10)

In [7]:
# filename = 'part2_weight_initializers.h5'
# group_name = 'sigmoid_stdev_0.01'

# # Uses normal initializer
# initializer = normal(mean=0, stddev=0.01, seed=13)

# # Builds BLOCK model
# model = build_model(n_layers=5, input_dim=10, units=100, 
#                     activation='sigmoid', initializer=initializer)

# # Since we only need initial weights, we don't even need to train the model! 
# # We still use the ReplayData callback, but we can pass the model as argument instead
# replaydata = ReplayData(X, y, filename=filename, group_name=group_name, model=model)

# # Now we feed the data to the actual Replay object
# # so we can build the visualizations
# replay = Replay(replay_filename=filename, group_name=group_name)

# # Using subplot2grid to assemble a complex figure...
# fig = plt.figure(figsize=(12, 6))
# ax_zvalues = plt.subplot2grid((2, 2), (0, 0))
# ax_weights = plt.subplot2grid((2, 2), (0, 1))
# ax_activations = plt.subplot2grid((2, 2), (1, 0))
# ax_gradients = plt.subplot2grid((2, 2), (1, 1))

# wv = replay.build_weights(ax_weights)
# gv = replay.build_gradients(ax_gradients)
# # Z-values
# zv = replay.build_outputs(ax_zvalues, before_activation=True, 
#                           exclude_outputs=True, include_inputs=False)
# # Activations
# av = replay.build_outputs(ax_activations, exclude_outputs=True, include_inputs=False)

# # Finally, we use compose_plots to update all
# # visualizations at once
# fig = compose_plots([zv, wv, av, gv], 
#                     epoch=0, 
#                     title=r'Activation: sigmoid - Initializer: Normal $\sigma = 0.01$')

## Problem 3: Batch Norm, Dropout, MNIST

### 1.
#### Co-Adaptation
In Neural network, co-adaptation means that some neurons are highly dependent on others. Conseuquently, these co-dependent units seem to output correctly only under a very narrow and specific set of circumstances (inputs and features), causing them to frequently fire erroneously under general conditions. Co-adapted neurons have a high tendency for inaccurate results since even a single incorrect input to a unit can trigger a chain of incorrect outputs in all of its co-adapted peers. This behaviour is akin to overfitting, where the model performs great only on the training data (fixed specific circumstance), but fails to generalize and therefore performs poorly on validation/test data.

#### Internal Covariate Shift
We define Internal Covariate Shift as the change in the distribution of network activations due to the change in network parameters during training. It's well known that a neural network trains faster the more consistent the distribution of the features are in any given layer. A shift in the distribution causes overhead due to the excess iterations required for the network to adjust to the new distributions. We can achieve convergence faster if we employ methods to counter internal covariate shift as it helps avoid this overhead.



### LeNet 5

In [15]:
# Load in relevant libraries, and alias where appropriate
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms

In [16]:
# Define relevant variables for the ML task
batch_size = 64
num_classes = 10
learning_rate = 0.001
num_epochs = 10

# Device will determine whether to run the training on GPU or CPU.
device = torch.device('cpu')
print(device)

cpu


In [17]:
#Loading the dataset and preprocessing
train_dataset = torchvision.datasets.MNIST(root = './data',
                                           train = True,
                                           transform = transforms.Compose([
                                                  transforms.Resize((32,32)),
                                                  transforms.ToTensor(),
                                                  transforms.Normalize(mean = (0.1307,), std = (0.3081,))]),
                                           download = True)


test_dataset = torchvision.datasets.MNIST(root = './data',
                                          train = False,
                                          transform = transforms.Compose([
                                                  transforms.Resize((32,32)),
                                                  transforms.ToTensor()]),
                                          download=True)


train_loader = torch.utils.data.DataLoader(dataset = train_dataset,
                                           batch_size = batch_size,
                                           shuffle = True)


test_loader = torch.utils.data.DataLoader(dataset = test_dataset,
                                           batch_size = batch_size,
                                           shuffle = True)

In [18]:
# Defining the convolutional neural network
def get_accuracy(pred_model, loader):
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in loader:
#             images = images.to(device)
#             labels = labels.to(device)
            pred_model = pred_model.to("cpu")
            outputs = pred_model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        pred_model.to(device)
        return 100 * correct / total
    

def get_mean_loss(pred_model, loader):
    tot_loss = 0
    for images, labels in loader:  
        images = images.to(device)
        labels = labels.to(device)
        
        #Forward pass
        outputs = pred_model(images)
        loss = cost(outputs, labels)
        tot_loss += loss
    
    return tot_loss / len(loader.dataset)
        
class LeNet5(nn.Module):
    def __init__(self, num_classes, input_tfs = [], hidden_tfs = []):
        super(LeNet5, self).__init__()
        
        # Input Layer
        self.conv1 =nn.Conv2d(1, 6, kernel_size=5, stride=1, padding=0)
        input_tfs = [nn.Identity()] + input_tfs
        self.input_tfs = nn.Sequential(*input_tfs)
        self.layer1 = nn.Sequential(nn.ReLU(), nn.MaxPool2d(kernel_size = 2, stride = 2))
        
        # Hidden Layer
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5, stride=1, padding=0)
        hidden_tfs = [nn.Identity()] + hidden_tfs
        self.hidden_tfs = nn.Sequential(*hidden_tfs)
        self.layer2 = nn.Sequential(nn.ReLU(), nn.MaxPool2d(kernel_size = 2, stride = 2))
        
        self.fc = nn.Linear(400, 120)
        self.relu = nn.ReLU()
        self.fc1 = nn.Linear(120, 84)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(84, num_classes)
        
    def forward(self, x):
        out = self.conv1(x)
        out = self.input_tfs(out)
        out = self.layer1(out)
        out = self.conv2(out)
        out = self.hidden_tfs(out)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        out = self.relu(out)
        out = self.fc1(out)
        out = self.relu1(out)
        out = self.fc2(out)
        return out

### 2. Standard Norm on Input Layer, Batch Norm on Hidden Layer

In [19]:
model = LeNet5(num_classes, hidden_tfs = [nn.BatchNorm2d(16)]).to(device)

#Setting the loss function
cost = nn.CrossEntropyLoss()

#Setting the optimizer with the model parameters and learning rate
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

#this is defined to print how many steps are remaining when training
total_step = len(train_loader)

In [20]:
total_step = len(train_loader)
std_tr_losses, std_ts_losses, std_tr_accuracy, std_ts_accuracy = [], [], [], []
for epoch in range(num_epochs):
    epoch_loss = 0
    for i, (images, labels) in enumerate(train_loader):  
        images = images.to(device)
        labels = labels.to(device)
        
        #Forward pass
        outputs = model(images)
        loss = cost(outputs, labels)
        epoch_loss += loss * images.size(0)
        	
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        		
        if (i+1) % 400 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
        		           .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
    
    std_tr_accuracy.append(get_accuracy(model, train_loader))
    std_ts_accuracy.append(get_accuracy(model, test_loader))
    std_tr_losses.append(epoch_loss / len(train_loader.dataset))
    std_ts_losses.append(get_mean_loss(model, test_loader))

Epoch [1/10], Step [400/938], Loss: 0.0211
Epoch [1/10], Step [800/938], Loss: 0.0345
Epoch [2/10], Step [400/938], Loss: 0.0153
Epoch [2/10], Step [800/938], Loss: 0.0164
Epoch [3/10], Step [400/938], Loss: 0.0945
Epoch [3/10], Step [800/938], Loss: 0.0070
Epoch [4/10], Step [400/938], Loss: 0.1194
Epoch [4/10], Step [800/938], Loss: 0.0186
Epoch [5/10], Step [400/938], Loss: 0.0033
Epoch [5/10], Step [800/938], Loss: 0.0149
Epoch [6/10], Step [400/938], Loss: 0.0151
Epoch [6/10], Step [800/938], Loss: 0.0203
Epoch [7/10], Step [400/938], Loss: 0.0168
Epoch [7/10], Step [800/938], Loss: 0.1242
Epoch [8/10], Step [400/938], Loss: 0.0377
Epoch [8/10], Step [800/938], Loss: 0.0013
Epoch [9/10], Step [400/938], Loss: 0.0120
Epoch [9/10], Step [800/938], Loss: 0.0406
Epoch [10/10], Step [400/938], Loss: 0.0027
Epoch [10/10], Step [800/938], Loss: 0.0186


In [22]:
# Test the model
# In test phase, we don't need to compute gradients (for memory efficiency)
print('Accuracy of the network on train images: {} %'.format(get_accuracy(model, train_loader)))

print('Accuracy of the network on test images: {} %'.format(get_accuracy(model, test_loader)))
	 

Accuracy of the network on train images: 99.78333333333333 %
Accuracy of the network on test images: 98.57 %


#### Input Layer Standard Normalized Parameters

In [None]:
print(f"Weights: {model.state_dict()['conv1.weight']}")
print(f"Bias: {model.state_dict()['conv1.bias']}")

#### Hidden Later Batch Normalized Parameters

In [None]:
print(f"Weights: {model.state_dict()['hidden_tfs.1.weight']}")
print(f"Bias: {model.state_dict()['hidden_tfs.1.bias']}")

### 3. Batch Normalization on Input and Hidden Layer

In [None]:
model = LeNet5(num_classes, input_tfs = [nn.BatchNorm2d(6)], hidden_tfs = [nn.BatchNorm2d(16)]).to(device)

#Setting the loss function
cost = nn.CrossEntropyLoss()

#Setting the optimizer with the model parameters and learning rate
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

#this is defined to print how many steps are remaining when training
total_step = len(train_loader)

In [None]:
total_step = len(train_loader)
bn_tr_losses, bn_ts_losses, bn_tr_accuracy, bn_ts_accuracy = [], [], [], []
for epoch in range(num_epochs):
    epoch_loss = 0
    for i, (images, labels) in enumerate(train_loader):  
        images = images.to(device)
        labels = labels.to(device)
        
        #Forward pass
        outputs = model(images)
        loss = cost(outputs, labels)
        epoch_loss += loss * images.size(0)
        	
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        		
        if (i+1) % 400 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
        		           .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
    
    bn_tr_accuracy.append(get_accuracy(model, train_loader))
    bn_ts_accuracy.append(get_accuracy(model, test_loader))
    bn_tr_losses.append(epoch_loss / len(train_loader.dataset))
    bn_ts_losses.append(get_mean_loss(model, test_loader))

In [None]:
# Test the model
# In test phase, we don't need to compute gradients (for memory efficiency)
print('Accuracy of the network on train images: {} %'.format(accuracy(model, train_loader)))

print('Accuracy of the network on test images: {} %'.format(accuracy(model, test_loader)))

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

w_input, w_hidden = model.state_dict()["input_tfs.1.weight"], model.state_dict()["hidden_tfs.1.weight"]
data = {"Input Layer": w_input.tolist(), "Hidden Layer": w_hidden.tolist()}

In [None]:
print("Input Layer Weights")
sns.violinplot(x="Input Layer", data=data)
plt.show()

In [None]:
print("Hidden Layer Weights")
sns.violinplot(x="Hidden Layer", data=data)
plt.show()

In [None]:
fig, axes = plt.subplots(1, 2)

axes[0].plot([i + 1 for i in range(num_epochs)], std_tr_losses, label="Std Norm Training")
axes[0].plot([i + 1 for i in range(num_epochs)], bn_tr_losses, label="Batch Norm Training")
axes[0].plot([i + 1 for i in range(num_epochs)], std_ts_losses, label="Std Norm Test")
axes[0].plot([i + 1 for i in range(num_epochs)], bn_ts_losses, label="Batch Norm Test")
axes[0].legend(loc="best")
axes[0].title("Loss vs Epochs")

axes[1].plot([i + 1 for i in range(num_epochs)], std_tr_accuracy, label="Std Norm Training")
axes[1].plot([i + 1 for i in range(num_epochs)], bn_tr_accuracy, label="Batch Norm Training")
axes[1].plot([i + 1 for i in range(num_epochs)], std_ts_accuracy, label="Std Norm Test")
axes[1].plot([i + 1 for i in range(num_epochs)], bn_ts_accuracy, label="Batch Norm Test")
axes[1].legend(loc="best")
axes[1].title("Accuracy vs Epochs")
fig.show()