# Lab 3: Training a Network Using PyTorch's Autograd
### Author: Nigel Nelson
### Course: CS-3450
### Date: 3/25/2022
---
---
## Computing Forward

### Imports:

In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import torchvision
import warnings
import os.path

### Test Case #1

In [2]:
def ReLU(x):
    return x * (x > 0)

In [3]:
reg_const = 0.01
x = torch.tensor([[0], [1]])
W = torch.tensor([[1, 1], [1, 1], [0, 0]])
b1 = torch.tensor([[0], [1], [1]])
M = torch.tensor([[1, 0, 1], [1, 0, 1]])
b2 = torch.tensor([[1], [0]])
y = torch.tensor([[2], [2]])

In [4]:
u = W @ x + b1
h = ReLU(u)
v = M @ h + b2
print('V term output:')
print(v)

V term output:
tensor([[3],
        [2]])


In [5]:
L = (1/y.numel())*((y - v)**2).sum()
print('L2 loss output:')
print(L)

L2 loss output:
tensor(0.5000)


In [6]:
s1 = (W**2).sum()
s2 = (M**2).sum()
s = reg_const * (s1 + s2)
print('Standardization term output:')
print(s)

Standardization term output:
tensor(0.0800)


In [7]:
J = L + s
print('Final output of the network:')
print(J)

Final output of the network:
tensor(0.5800)


### Test Case #2

In [8]:
reg_const = 0.01
x = torch.tensor([[2], [3]])
W = torch.tensor([[3, 1], [2, 4], [0, 2]])
b1 = torch.tensor([[2], [-2], [1]])
M = torch.tensor([[-2, 0, 3], [3, 2, 3]])
b2 = torch.tensor([[-1], [2]])
y = torch.tensor([[1], [2]])

In [9]:
u = W @ x + b1
h = ReLU(u)
v = M @ h + b2
print('V term output:')
print(v)

V term output:
tensor([[-2],
        [84]])


In [10]:
L = (1/y.numel())*((y - v)**2).sum()
print('L2 loss output:')
print(L)

L2 loss output:
tensor(3366.5000)


In [11]:
s1 = (W**2).sum()
s2 = (M**2).sum()
s = reg_const * (s1 + s2)
print('Standardization term output:')
print(s)

Standardization term output:
tensor(0.6900)


In [12]:
J = L + s
print('Final output of the network:')
print(J)

Final output of the network:
tensor(3367.1899)


### Test Case #3

In [13]:
reg_const = 0.01
x = torch.tensor([[-3], [4]])
W = torch.tensor([[-1, 3], [3, 2], [4, -2]])
b1 = torch.tensor([[-3], [4], [0]])
M = torch.tensor([[3, 2, 1], [-1, -2, -3]])
b2 = torch.tensor([[5], [4]])
y = torch.tensor([[4], [2]])

In [14]:
u = W @ x + b1
h = ReLU(u)
v = M @ h + b2
print('V term output:')
print(v)

V term output:
tensor([[ 47],
        [-14]])


In [15]:
L = (1/y.numel())*((y - v)**2).sum()
print('L2 loss output:')
print(L)

L2 loss output:
tensor(1052.5000)


In [16]:
s1 = (W**2).sum()
s2 = (M**2).sum()
s = reg_const * (s1 + s2)
print('Standardization term output:')
print(s)

Standardization term output:
tensor(0.7100)


In [17]:
J = L + s
print('Final output of the network:')
print(J)

Final output of the network:
tensor(1053.2100)


---
---
## Provided Code:

In [18]:
EPOCHS = 50

# For simple regression problem
TRAINING_POINTS = 1000

# For fashion-MNIST and similar problems
DATA_ROOT = '/../../data/cs3450/data/'
FASHION_MNIST_TRAINING = '/../../data/cs3450/data/fashion_mnist_flattened_training.npz'
FASHION_MNIST_TESTING = '/../../data/cs3450/data/fashion_mnist_flattened_testing.npz'
CIFAR10_TRAINING = '/../../data/cs3450/data/cifar10_flattened_training.npz'
CIFAR10_TESTING = '/../../data/cs3450/data/cifar10_flattened_testing.npz'
CIFAR100_TRAINING = '/../../data/cs3450/data/cifar100_flattened_training.npz'
CIFAR100_TESTING = '/../../data/cs3450/data/cifar100_flattened_testing.npz'


In [19]:
def try_gpu(i=0):  #@save
    """Return gpu(i) if exists, otherwise return cpu().
       https://d2l.ai/chapter_deep-learning-computation/use-gpu.html
    """
    if torch.cuda.device_count() >= i + 1:
        return torch.device(f'cuda:{i}')
    return torch.device('cpu')
DEVICE=try_gpu()
DEVICE

device(type='cuda', index=0)

In [20]:
def create_linear_training_data():
    """
    This method simply rotates points in a 2D space.
    Be sure to use L2 regression in the place of the final softmax layer before testing on this
    data!
    :return: (x,y) the dataset. x is a numpy array where columns are training samples and
             y is a numpy array where columns are one-hot labels for the training sample.
    """
    x = torch.randn((2, TRAINING_POINTS))
    x1 = x[0:1, :].clone()
    x2 = x[1:2, :]
    y = torch.cat((-x2, x1), axis=0)
    return x, y


def create_folded_training_data():
    """
    This method introduces a single non-linear fold into the sort of data created by create_linear_training_data. Be sure to REMOVE the final softmax layer before testing on this data!
    Be sure to use L2 regression in the place of the final softmax layer before testing on this
    data!
    :return: (x,y) the dataset. x is a numpy array where columns are training samples and
             y is a numpy array where columns are one-hot labels for the training sample.
    """
    x = torch.randn((2, TRAINING_POINTS))
    x1 = x[0:1, :].clone()
    x2 = x[1:2, :]
    x2 *= 2 * ((x2 > 0).float() - 0.5)
    y = torch.cat((-x2, x1), axis=0)
    return x, y


def create_square():
    """
    This is the square example that we looked at in class.
    insideness is true if the points are inside the square.
    :return: (points, insideness) the dataset. points is a 2xN array of points and insideness is true if the point is inside the square.
    """
    win_x = [2,2,3,3]
    win_y = [1,2,2,1]
    win = torch.tensor([win_x,win_y],dtype=torch.float32)
    win_rot = torch.cat((win[:,1:],win[:,0:1]),axis=1)
    t = win_rot - win # edges tangent along side of poly
    rotation = torch.tensor([[0, 1],[-1,0]],dtype=torch.float32)
    normal = rotation @ t # normal vectors to each side of poly
        # torch.matmul(rotation,t) # Same thing

    points = torch.rand((2,2000),dtype = torch.float32)
    points = 4*points

    vectors = points[:,np.newaxis,:] - win[:,:,np.newaxis] # reshape to fill origin
    insideness = (normal[:,:,np.newaxis] * vectors).sum(axis=0)
    insideness = insideness.T
    insideness = insideness > 0
    insideness = insideness.all(axis=1)
    return points, insideness

In [21]:
def load_dataset_flattened(train=True,dataset='Fashion-MNIST',download=False):
    """
    :param train: True for training, False for testing
    :param dataset: 'Fashion-MNIST', 'CIFAR-10', or 'CIFAR-100'
    :param download: True to download. Keep to false afterwords to avoid unneeded downloads.
    :return: (x,y) the dataset. x is a numpy array where columns are training samples and
             y is a numpy array where columns are one-hot labels for the training sample.
    """
    if dataset == 'Fashion-MNIST':
        if train:
            path = FASHION_MNIST_TRAINING
        else:
            path = FASHION_MNIST_TESTING
        num_labels = 10
    elif dataset == 'CIFAR-10':
        if train:
            path = CIFAR10_TRAINING
        else:
            path = CIFAR10_TESTING
        num_labels = 10
    elif dataset == 'CIFAR-100':
        if train:
            path = CIFAR100_TRAINING
        else:
            path = CIFAR100_TESTING
        num_labels = 100
    else:
        raise ValueError('Unknown dataset: '+str(dataset))

    if os.path.isfile(path):
        print('Loading cached flattened data for',dataset,'training' if train else 'testing')
        data = np.load(path)
        x = torch.tensor(data['x'],dtype=torch.float32)
        y = torch.tensor(data['y'],dtype=torch.float32)
        pass
    else:
        class ToTorch(object):
            """Like ToTensor, only to a numpy array"""

            def __call__(self, pic):
                return torchvision.transforms.functional.to_tensor(pic)

        if dataset == 'Fashion-MNIST':
            data = torchvision.datasets.FashionMNIST(
                root=DATA_ROOT, train=train, transform=ToTorch(), download=download)
        elif dataset == 'CIFAR-10':
            data = torchvision.datasets.CIFAR10(
                root=DATA_ROOT, train=train, transform=ToTorch(), download=download)
        elif dataset == 'CIFAR-100':
            data = torchvision.datasets.CIFAR100(
                root=DATA_ROOT, train=train, transform=ToTorch(), download=download)
        else:
            raise ValueError('This code should be unreachable because of a previous check.')
        x = torch.zeros((len(data[0][0].flatten()), len(data)),dtype=torch.float32)
        for index, image in enumerate(data):
            x[:, index] = data[index][0].flatten()
        labels = torch.tensor([sample[1] for sample in data])
        y = torch.zeros((num_labels, len(labels)), dtype=torch.float32)
        y[labels, torch.arange(len(labels))] = 1
        np.savez(path, x=x.detach().numpy(), y=y.detach().numpy())
    return x, y

In [22]:
dataset = 'Fashion-MNIST'
# dataset = 'CIFAR-10'
# dataset = 'CIFAR-100'

x_train, y_train = create_linear_training_data()
#x_train, y_train = create_folded_training_data()
#points_train, insideness_train = create_square()
#x_train, y_train = load_dataset_flattened(train=True, dataset=dataset, download=False)

# Move selected datasets to GPU
x_train = x_train.to(DEVICE)
y_train = y_train.to(DEVICE)

In [23]:
x_train

tensor([[-1.3149, -2.0681,  0.9812,  ...,  0.2736,  0.3274,  0.2501],
        [-0.6397,  0.7172,  0.5234,  ...,  0.2444,  1.2426, -0.5625]],
       device='cuda:0')

In [24]:
y_train

tensor([[ 0.6397, -0.7172, -0.5234,  ..., -0.2444, -1.2426,  0.5625],
        [-1.3149, -2.0681,  0.9812,  ...,  0.2736,  0.3274,  0.2501]],
       device='cuda:0')

In [25]:
x_test, y_test = create_linear_training_data()
# x_test, y_test = load_dataset_flattened(train=False, dataset=dataset, download=False)

# Move the selected datasets to the GPU
x_test = x_test.to(DEVICE)
y_test = y_test.to(DEVICE)

In [26]:
x_test

tensor([[-0.6147,  0.6495,  0.1535,  ..., -0.8542, -1.3896,  1.4529],
        [-0.9008, -0.5556, -0.7908,  ..., -1.2958, -1.3622, -1.9388]],
       device='cuda:0')

In [27]:
y_test

tensor([[ 0.9008,  0.5556,  0.7908,  ...,  1.2958,  1.3622,  1.9388],
        [-0.6147,  0.6495,  0.1535,  ..., -0.8542, -1.3896,  1.4529]],
       device='cuda:0')

---
---
### Backpropagation with Autograd:

In [28]:
class Layer:
    """
    Responsible for modeling a single matrix in an Input
    """
    def __init__(self, output_shape):
        """
        :param output_shape (tuple): the shape of the output array.  When this is a single number,
        it gives the number of output neurons. When this is an array, it gives the dimensions 
        of the array of output neurons.
        """
        if not isinstance(output_shape, tuple):
            output_shape = (output_shape,)
            
        self.output_shape = output_shape
        
        
class Input(Layer):
    """
    Responsible for modeling a single matrix in a Linear layer
    """
    def __init__(self, output_shape):
        """
        :param output_shape (tuple): the shape of the output array. Passed to parent's initializer
        """
        Layer.__init__(self, output_shape)

    def set(self, value):
        """
        :param value: Value of the matrix. If the shape of the matrix doesn't meet the expectations
        of the Input instance, an assertion error is raised
        """
        assert self.output_shape == value.shape
        self.output = value
        
    def forward(self):
        """This layer's values do not change during forward propagation."""
        pass


class LinearReLU(Layer):
    """
    Class responsible for modeling a Linear Layer with a ReLU activation function
    """
    def __init__(self, x, W, b):
        """
        :param x: The input matrix of the layer
        :param W: The weight matrix of the layer
        :param b: The biase matrix of the layer. If this doesn't equal the Input's expected shape,
        an assertion error is raised
        """
        Layer.__init__(self, b.output_shape) # TODO: Pass along any arguments to the parent's initializer here.
        self.x = x
        self.W = W
        self.b = b
        
    def ReLU(self, x):
        """
        :param x: The values to perform the ReLU activation function on
        """
        return x * (x > 0)
    
    def forward(self):
        """
        Sets the layer's output based on the outputs of the layers that feed into it after applying the
        ReLU activation function
        """
        self.output = self.ReLU((self.W.output @ self.x.output) + self.b.output)
   

class Linear(Layer):
    """
    Class responsible for modeling a Linear Layer without an activation function
    """
    def __init__(self, x, W, b):
        """
        :param x: The input matrix of the layer
        :param W: The weight matrix of the layer
        :param b: The biase matrix of the layer. If this doesn't equal the Input's expected shape,
        an assertion error is raised
        """
        Layer.__init__(self, b.output_shape) # TODO: Pass along any arguments to the parent's initializer here.
        self.x = x
        self.W = W
        self.b = b
    

    def forward(self):
        """
        Sets the layer's output based on the outputs of the layers that feed into it
        """
        self.output = (self.W.output @ self.x.output) + self.b.output
        
class Network:
    """
    Class responsible for defining the behavior of a simple Neural Network with a single hidden layer
    """
    def __init__(self, input_rows, num_hidden_nodes, dtype=torch.float32, device=torch.device('cuda:0')):
        """
        :param input_rows: The number of rows expected in the input of the network
        :param num_hidden_nodes: The number of nodes in the hidden layer desired
        :param dtype: The data type to be used with the PyTorch tensors
        :param device: The device desired to be used with the PyTorch tensors
        """
        # Define weights and bias matrices for input -> hidden layer
        W = torch.rand((num_hidden_nodes, input_rows), dtype=dtype, device=device, requires_grad=True)
        W.data *= 0.1
        b1 = torch.zeros((num_hidden_nodes,1), dtype=dtype, device=device, requires_grad=True)
        
        # Define weights and bias matrices for hidden layer -> ouput
        M = torch.rand((input_rows ,num_hidden_nodes), dtype=dtype, device=device, requires_grad=True)
        M.data *= 0.1
        b2 = torch.zeros((input_rows, 1), dtype=dtype, device=device, requires_grad=True)

        # Create Input instances for all matrices
        W_layer = Input((num_hidden_nodes, input_rows))
        W_layer.set(W)
        b1_layer = Input((num_hidden_nodes,1))
        b1_layer.set(b1)
        M_layer = Input((input_rows,num_hidden_nodes))
        M_layer.set(M)
        b2_layer = Input((input_rows,1))
        b2_layer.set(b2)

        # Create 1st layer with ReLU activation function
        x1_layer = Input(x_train.shape[0])
        linear_layer1 = LinearReLU(x1_layer, W_layer, b1_layer)
        
        # Create 2nd layer without activation function
        x2_layer = Input(b1_layer.output.shape[0])
        linear_layer2 = Linear(x2_layer, M_layer, b2_layer)
        
        # Assign class variables
        self.layer1 = linear_layer1
        self.layer2 = linear_layer2
    
    
    def L2(self, actual, predicted):
        """
        Returns the L2 loss of the supplied args
        :param actual: The true values
        :param predicted: The predicted values
        """
        return ((actual - predicted)**2).mean()
        
    def train(self, x_train, y_train, num_epochs, learning_rate, reg_const, batch_size):
        """
        Method responsible for training the Neural Network
        :param x_train: The X training data
        :param y_train: The y training labels
        :param num_epochs: Number of epochs to train for
        :param learning_rate: The rate at which parameters are adjusted
        :param reg_const: The regularization constant that scales the regularization term
        :param batch_size: The batch size used for training
        
        """
        # Adjust the x matrices according to the batch size
        self.layer1.x = Input((x_train.shape[0], batch_size))
        self.layer2.x = Input((self.layer1.b.output.shape[0], batch_size))
        
        for epoch in range(num_epochs):
            for i in range(x_train.shape[1]//batch_size):
                # Get the correct locations to reference in the training and testing sets
                start_idx = i*batch_size
                end_idx = i*batch_size + batch_size

                # Populate the x matrix with the training samples in this batch
                self.layer1.x.set(x_train[:, start_idx : end_idx].reshape(x_train.shape[0], batch_size))
                self.layer1.forward()
                self.layer2.x.set(self.layer1.output)
                self.layer2.forward()

                # Calculate the L2 loss using the output of layer 2 and the associated samples in
                # y_train
                l2 = self.L2((y_train[:, start_idx : end_idx]).reshape(y_train.shape[0], batch_size),
                         self.layer2.output)

                # Calculate the regularization term
                s1 = (self.layer1.W.output**2).sum()
                s2 = (self.layer2.W.output**2).sum()
                reg = reg_const*(s1 + s2)

                # Calculate the final cost term
                cost = l2 + reg

                # Compute backpropagation with Autograd
                cost.backward()

                # Used to update parameters inplace
                with torch.no_grad():

                    # Adjust parameters according to gradients and the learning rate
                    self.layer1.W.output -= learning_rate * self.layer1.W.output.grad
                    self.layer1.b.output -= learning_rate * self.layer1.b.output.grad
                    self.layer2.W.output -= learning_rate * self.layer2.W.output.grad
                    self.layer2.b.output -= learning_rate * self.layer2.b.output.grad

                    # Zero the gradients
                    self.layer1.W.output.grad.zero_()
                    self.layer1.b.output.grad.zero_()
                    self.layer2.W.output.grad.zero_()
                    self.layer2.b.output.grad.zero_()
                    
            print(f'Epoch #{epoch + 1} Loss: {l2.item()}')
    
    def test(self, x_test, y_test):
        """
        Method responsible for testing the Neural Network after it has been trained
        :param x_train: The X training data
        :param y_train: The training labels
        """
        # Recalibrate the 1st x layer according to the shape of the testing data
        self.layer1.x = Input(x_test.shape)
        self.layer1.x.set(x_test)
        
        self.layer1.forward()
        
        # Recalibrate the 2nd x layer according to the shape of the output of the 1st layer
        self.layer2.x = Input(self.layer1.output.shape)
        self.layer2.x.set(self.layer1.output)

        self.layer2.forward()

        # Computer L2 loss on the expected labels vs. the predicted labels
        l2 = self.L2(y_test, self.layer2.output)
        print(f'Testing L2 loss: {l2}')
        

### Training Network with best parameters found using batches > 1:

In [29]:
input_rows = 2
num_hidden_nodes = 2
num_epochs = 48
learning_rate = .1
reg_const = 0
batch_size = 4

network = Network(input_rows, num_hidden_nodes)
network.train(x_train, y_train, num_epochs, learning_rate, reg_const, batch_size)

Epoch #1 Loss: 8.001793321454898e-05
Epoch #2 Loss: 5.728245378122665e-05
Epoch #3 Loss: 9.603369107935578e-06
Epoch #4 Loss: 1.3274682260089321e-06
Epoch #5 Loss: 8.005353038242902e-07
Epoch #6 Loss: 4.996358029529802e-07
Epoch #7 Loss: 3.1549438972433563e-07
Epoch #8 Loss: 1.919332532906992e-07
Epoch #9 Loss: 1.0649807791196508e-07
Epoch #10 Loss: 5.0658194084007846e-08
Epoch #11 Loss: 1.6401516589326093e-08
Epoch #12 Loss: 1.3046591584853218e-09
Epoch #13 Loss: 1.463243415322779e-11
Epoch #14 Loss: 8.358341796466107e-12
Epoch #15 Loss: 6.529582430303549e-12
Epoch #16 Loss: 4.785200014012503e-12
Epoch #17 Loss: 3.5550729027278294e-12
Epoch #18 Loss: 2.7832458560084206e-12
Epoch #19 Loss: 1.943029070972102e-12
Epoch #20 Loss: 6.072087277431137e-13
Epoch #21 Loss: 8.319178679272454e-13
Epoch #22 Loss: 2.9012903191016903e-13
Epoch #23 Loss: 3.6740055442407993e-13
Epoch #24 Loss: 1.7821855102795325e-13
Epoch #25 Loss: 6.097899962753672e-14
Epoch #26 Loss: 4.232725281383409e-14
Epoch #27 

### Verifying Network layer's weight matrices equal ~[[0,-1],[1,0]]

In [30]:
network.layer2.W.output @ network.layer1.W.output

tensor([[ 8.1956e-08, -1.0000e+00],
        [ 1.0000e+00,  8.9407e-08]], device='cuda:0', grad_fn=<MmBackward>)

### Testing Network on the testing data:

In [31]:
network.test(x_test, y_test)

Testing L2 loss: 0.0012183780781924725


### Training Network with best parameters found :

In [32]:
input_rows = 2
num_hidden_nodes = 3
num_epochs = 20
learning_rate = .1
reg_const = 0
batch_size = 1

network = Network(input_rows, num_hidden_nodes)
network.train(x_train, y_train, num_epochs, learning_rate, reg_const, batch_size)

Epoch #1 Loss: 0.02058352343738079
Epoch #2 Loss: 0.0004995372146368027
Epoch #3 Loss: 0.00022733266814611852
Epoch #4 Loss: 0.00026231163064949214
Epoch #5 Loss: 2.4408750505244825e-06
Epoch #6 Loss: 1.4741344500635023e-07
Epoch #7 Loss: 4.4853010194856324e-14
Epoch #8 Loss: 1.0169642905566434e-13
Epoch #9 Loss: 1.496580637194711e-13
Epoch #10 Loss: 4.3565151486291143e-13
Epoch #11 Loss: 7.038813976123492e-13
Epoch #12 Loss: 2.935429677108914e-13
Epoch #13 Loss: 1.567634910770721e-13
Epoch #14 Loss: 1.283417816466681e-13
Epoch #15 Loss: 1.0169642905566434e-13
Epoch #16 Loss: 2.1760371282653068e-14
Epoch #17 Loss: 2.509104035652854e-13
Epoch #18 Loss: 7.038813976123492e-13
Epoch #19 Loss: 8.180567334648003e-12
Epoch #20 Loss: 5.1958437552457326e-14


### Verifying Network layer's weight matrices equal ~[[0,-1],[1,0]]

In [33]:
network.layer2.W.output @ network.layer1.W.output

tensor([[-1.6391e-07, -1.0000e+00],
        [ 1.0000e+00,  5.5879e-08]], device='cuda:0', grad_fn=<MmBackward>)

### Testing Network on the testing data:

In [34]:
network.test(x_test, y_test)

Testing L2 loss: 0.0006278472719714046
