## Student Identity
Name: Arman Lotfalikhani <br>
Student Number: 99109166

In [1]:
import torch
from torchvision.datasets import MNIST
from torchvision import transforms
from torch.utils.data import DataLoader

import numpy as np
from matplotlib import pyplot as plt
from typing import Dict

In [2]:
train_set = MNIST(root='.', train=True, download=True, transform=transforms.ToTensor())
test_set = MNIST(root='.', train=False, download=True, transform=transforms.ToTensor())
image_shape = train_set[0][0].shape
input_dim = np.prod(image_shape).item()
num_classes = len(MNIST.classes)
print(num_classes)

10


In [3]:
train_loader = DataLoader(train_set, 50000, shuffle=True)
test_loader = DataLoader(test_set, 10000, shuffle=True)

### Help from other sources
The website https://medium.com/@Mosbeh_Barhoumi/forward-forward-algorithm-ac24d0d9ffd was used for getting the idea of using a norm-2 for the normalization at the beginning og each layer (Instead of batch normalization or norm-1, which give 10% accuracies). Also, shuffling the dataset labels for negative data was inspired from there ("tf.random.shuffle(y)").

### Preliminary description 
About the loss function: In the article, it is said that we assume a sigmoid for the probability of a sample being positive,

$$P(x=pos)=\sigma(\sum y_i^2 -\theta) $$
If we implement a NLL loss for the joint distribution (Similar to the derivation of the crossentroyloss) we have:
$$ -\frac{1}{N} \log \prod (P(X_i=pos)^{X_i==pos}(1-P(X=pos))^{X_i==neg})$$
$$=\frac{1}{N} (\sum (X_i==pos) \log (1+e^{\theta-\sum y_j^2}) + \sum (X_i==neg) \log(1+e^{-\theta+\sum y_j^2}) ) $$
Which is the stated loss function.

### Code explanation: 
First, we explain a few choices in the implementation.<br>
1: Norm-2 was used for the normalization: In fact, the important output of each layer is its goodness, which is a sum of squares. Any normalization that changes the relative sum of squares (norm-2 squared) for different datapoints (batch normalization, etc.) is bound to fail. <br>
2: The minibatch size is the whole dataset. This code is still in the train() function: <br>
for i,train in enumerate(train_dataloader,0): <br>
      data=train <br>
      
The for statement itself is run on the CPU, and takes about 4 seconds, even though the code inside of it uses the GPU properly. As we need a lot a epochs for training, I was forced to use all samples at once, to train in a reasonable time. <br>


### Code explanation: Part 2
First, an FFLayer is implemented which combines a linear and relu PyTorch layers. The forward pass uses the mentioned normalization. <br>
Then, the FFLayer is implemented with a threshold list (It must have one element fewer than the dimensions array or it would give an error). Also, as we need to train layer after layer, forward_for_layer() is used to forward the data from previously-trained layers to the current layer in the process. <br>
At last, the predict() function is implemented in this way: As the labels are one-hot and we have to give all labels, we augment the input row and concatenate if with the identity matrix. Then, we add the goodnesses of all layers after proper forwarding and return the argmax.
Parctical note: I wrongly assumed that the output of the last layer was enough for this prediction. As I got a low (10%) accuracy, I tested a few ways and came up with this method which works well.

In [4]:
import torch.nn as nn
import torch.nn.functional as F
import time
class FFLayer(nn.Module):
    def __init__(self, epsilon, in_dim, out_dim, threshold, lr, device='cpu',dtype=torch.float32):
        super().__init__()
        self.layer=nn.Sequential(
            nn.Linear(in_features=in_dim, out_features=out_dim, bias=True, device=device, dtype=dtype),
            nn.ReLU()
        )

        self.threshold=threshold
        self.device=device
        self.dtype=dtype
        self.optimizer=torch.optim.Adam(self.parameters(),lr)

    def forward(self, x):
        x_normalized = x / (x.norm(2, 1, keepdim=True) + 1e-8)
        return self.layer(x_normalized) #self.layer(x)
    
    def optimizer_step(self, x_pos,x_neg):
        pos_f=self.forward(x_pos)
        neg_f=self.forward(x_neg)
        goodness_pos=torch.square(pos_f).mean(1)
        goodness_neg=torch.square(neg_f).mean(1)

        loss=torch.log(1+ torch.cat([torch.exp(-goodness_pos+self.threshold),
                                    torch.exp(goodness_neg-self.threshold)] ) ).mean()        
        loss.backward()
        self.optimizer.step()
        self.optimizer.zero_grad()
        return loss
        
class FFNet():
    def __init__(self,epsilon, dims, threshold_list, lr, device='cpu',dtype=torch.float32):
        '''Note: threshold_list must have size L and dims must have the size L+1. L: number of layers'''
        super().__init__()
        self.layers=[]
        self.device=device
        for i in range(len(dims)-1):
            self.layers.append(FFLayer(epsilon, in_dim=dims[i], out_dim=dims[i+1], device=device, 
                                       dtype=dtype, threshold=threshold_list[i], lr=lr))
        print(self.layers)
    def train(self, num_epochs, train_dataloader):
        data=None
        for i,train in enumerate(train_dataloader,0):
            data=train
        L=len(self.layers)
        for l in range(L):
            print('Training layer: ',l+1)
            for epoch in range(num_epochs):
                running_loss= 0.0
                
                images=data[0].to(self.device)
                labels=data[1].to(self.device)
                
                images=torch.flatten(images,start_dim=1)/torch.max(images) #Scale between zero and one
                ## If we use a randint with labels.shape, there is a higher chance (10%) each random label is actually correct.
                indices=torch.randperm(labels.size(0))
                false_labels=labels[indices]
                pos_onehot=F.one_hot(labels, num_classes)
                neg_onehot=F.one_hot(false_labels, num_classes)

                pos_data=self.forward_for_layer( l,torch.hstack( (pos_onehot, images)) )
                neg_data=self.forward_for_layer( l,torch.hstack((neg_onehot, images)) )

                running_loss+= self.layers[l].optimizer_step(pos_data,neg_data)

                running_loss_mean=running_loss
                if (epoch+1)%5==0:
                    print("Epoch: %i Running loss: %f"%(epoch, running_loss_mean))
                    
    def forward_for_layer(self,i,x): #Forwards the data for layer i in the training process
        y=x.clone()
        for j in range(i):
            y=self.layers[j].forward(y)
            
        return y.detach()
    def predict(self,x_row):
        xhat=x_row.clone().repeat(num_classes,1)
        one_hot_labels=torch.eye(num_classes, device=self.device)
        data=torch.hstack( (one_hot_labels, xhat))
        predicted_goodness=0
        for l in range(len(self.layers)):
            data=self.forward_for_layer(l,data)
            predicted_goodness+=torch.square(data).mean(1)
        
        prediction=predicted_goodness.argmax()
        return prediction


In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net=FFNet(epsilon=1e-2,dims=[794,400,300],device=device,dtype=torch.float32, threshold_list=[2, 3], lr=0.01)

[FFLayer(
  (layer): Sequential(
    (0): Linear(in_features=794, out_features=400, bias=True)
    (1): ReLU()
  )
), FFLayer(
  (layer): Sequential(
    (0): Linear(in_features=400, out_features=300, bias=True)
    (1): ReLU()
  )
)]


In [6]:
net.train(1200, train_loader)

Training layer:  1
Epoch: 4 Running loss: 1.065203
Epoch: 9 Running loss: 0.833339
Epoch: 14 Running loss: 0.751228
Epoch: 19 Running loss: 0.756918
Epoch: 24 Running loss: 0.717786
Epoch: 29 Running loss: 0.730019
Epoch: 34 Running loss: 0.711048
Epoch: 39 Running loss: 0.715878
Epoch: 44 Running loss: 0.710003
Epoch: 49 Running loss: 0.710394
Epoch: 54 Running loss: 0.708414
Epoch: 59 Running loss: 0.707977
Epoch: 64 Running loss: 0.707069
Epoch: 69 Running loss: 0.706748
Epoch: 74 Running loss: 0.706177
Epoch: 79 Running loss: 0.705837
Epoch: 84 Running loss: 0.705371
Epoch: 89 Running loss: 0.705081
Epoch: 94 Running loss: 0.704676
Epoch: 99 Running loss: 0.704380
Epoch: 104 Running loss: 0.704085
Epoch: 109 Running loss: 0.703734
Epoch: 114 Running loss: 0.703441
Epoch: 119 Running loss: 0.703175
Epoch: 124 Running loss: 0.702909
Epoch: 129 Running loss: 0.702595
Epoch: 134 Running loss: 0.702268
Epoch: 139 Running loss: 0.701989
Epoch: 144 Running loss: 0.701697
Epoch: 149 Runnin

Epoch: 9 Running loss: 1.145760
Epoch: 14 Running loss: 0.719579
Epoch: 19 Running loss: 0.804383
Epoch: 24 Running loss: 0.708605
Epoch: 29 Running loss: 0.689947
Epoch: 34 Running loss: 0.703388
Epoch: 39 Running loss: 0.674148
Epoch: 44 Running loss: 0.680155
Epoch: 49 Running loss: 0.671199
Epoch: 54 Running loss: 0.672982
Epoch: 59 Running loss: 0.671649
Epoch: 64 Running loss: 0.669064
Epoch: 69 Running loss: 0.667609
Epoch: 74 Running loss: 0.666587
Epoch: 79 Running loss: 0.667072
Epoch: 84 Running loss: 0.666671
Epoch: 89 Running loss: 0.667287
Epoch: 94 Running loss: 0.664391
Epoch: 99 Running loss: 0.665163
Epoch: 104 Running loss: 0.665489
Epoch: 109 Running loss: 0.665985
Epoch: 114 Running loss: 0.664908
Epoch: 119 Running loss: 0.662802
Epoch: 124 Running loss: 0.663521
Epoch: 129 Running loss: 0.662440
Epoch: 134 Running loss: 0.664433
Epoch: 139 Running loss: 0.660816
Epoch: 144 Running loss: 0.661320
Epoch: 149 Running loss: 0.661218
Epoch: 154 Running loss: 0.660362


### Model results
In the following sections, we see the evaluation of the trained model. As the predict() function takes one row at a time to replicate the row with different (one_hot) labels to pass it to the model, we need for loops for the evaluations.

In [12]:
x_train, y_train = next(iter(train_loader))
x_train, y_train = x_train.cuda(), y_train.cuda()
image=torch.flatten(x_train,start_dim=1)
l=0
N_train=x_train.size(0)
for i in range(N_train):
    if net.predict(image[i])!=y_train[i]:
        l+=1
print('Train set accuracy is: ',1-l/N_train)

Train set accuracy is:  0.8697


In [10]:
x_test, y_test = next(iter(test_loader))
x_test, y_test = x_test.cuda(), y_test.cuda()
image=torch.flatten(x_test,start_dim=1)
l=0
N_test=x_test.size(0)
for i in range(N_test):
    if net.predict(image[i])!=y_test[i]:
        l+=1
print('Test set accuracy is: ',1-l/N_test)

Test set accuracy is:  0.8706
