# Implementation of Hintons forward forward algorithm

In [45]:
student_id =  99105901
student_name = 'Amirhossein Akbari'

print("your student id:", student_id)
print("your name:", student_name)

your student id: 99105901
your name: Amirhossein Akbari


## Supervised fully connected network with pytorch

### Importing Dependencies

In [46]:
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from tqdm import tqdm
from torch.optim import Adam
from torchvision.datasets import MNIST
from torchvision.transforms import Compose, ToTensor, Normalize, Lambda, GaussianBlur
from torch.utils.data import DataLoader

### Functions to generate negative and positive data from dataset

In [47]:
def generate_positive(x, y):

    overlayed = x.clone()
    overlayed[:, :10] *= 0.0
    overlayed[range(x.shape[0]), y] = 1

    return overlayed


def generate_negative(x, y):

    y_neg = (y + torch.randint_like(y, 8) + 1) % 10

    overlayed = x.clone()
    overlayed[:, :10] *= 0.0
    overlayed[range(x.shape[0]), y_neg] = 1

    return overlayed

### Defining Layer and Network object implemented from nn.Linear layer and nn.Module network

With respect to repo. of Mohammad pezeshki and code as needed. I updated the model and added option to add layers and train them.

The Loss can meet the necessary conditions for goodness, if we write Cross Entropy Loss of layer assuming that we have equal number of neg. and pos. data; $ P(positive) = P(negative) $ , we can show that its equal to the defined loss in the question:
$$ Loss = -p\log{(\sigma(\sum_{j}^{} y_{pos_j}^2 - threshold))}-(1-p)\log{(\sigma(\sum_{j}^{} y_{neg_j}^2 - threshold))} $$
$$  = 0.5(\log{(1+e^{-(\sum_{j}^{} y_{pos_j}^2 - threshold)})}+\log{(1+e^{-(\sum_{j}^{} y_{neg_j}^2 - threshold)})}) $$
$$  = mean[\log{(1+e^{-(\sum_{j}^{} y_{pos_j}^2 - threshold)})},\log{(1+e^{-(\sum_{j}^{} y_{neg_j}^2 - threshold)})}] $$

In [48]:
class FFLayer(nn.Linear):

    def __init__(self, in_features, out_features,
                 bias=True, device=None, dtype=None, th=2):

        super().__init__(in_features, out_features, bias, device, dtype)

        self.relu = torch.nn.ReLU()
        self.opt = Adam(self.parameters(), lr=0.03)
        self.threshold = th

    def forward(self, x):
        return self.relu(torch.mm(x / (x.norm(2, 1, keepdim=True) + 1e-6), self.weight.T) + self.bias.unsqueeze(0)) # Forward with normalization L2

    def train(self, x_pos, x_neg, num_epochs):

        for _ in tqdm(range(num_epochs)):

            gp = self.forward(x_pos).pow(2).mean(1)
            gn = self.forward(x_neg).pow(2).mean(1)

            loss = torch.log(1 + torch.exp(torch.cat([-gp + self.threshold, gn - self.threshold]))).mean()

            self.opt.zero_grad()
            loss.backward()
            self.opt.step()

        return self.forward(x_pos).detach(), self.forward(x_neg).detach()


class FFNet(torch.nn.Module):

    def __init__(self, dims, use_cuda=False):
        super().__init__()
        
        self.dims = dims
        self.using_cuda = use_cuda
        self.layers = [FFLayer(dims[d], dims[d + 1]) for d in range(len(dims) - 1)]
        if use_cuda:
            self.layers = [l.cuda() for l in self.layers]


    def add_layer(self, dim: int):
        self.layers += [FFLayer(self.dims[-1], dim)]
        self.dims += [dim]
        if self.using_cuda:
            self.layers[-1] = self.layers[-1].cuda()

    def predict(self, x):

        g = []
        for label in range(10):
            h = generate_positive(x, label)             # Assuming the label is positive to see the neural activity
            goodness = []

            for l in self.layers:
                h = l.forward(h)
                goodness += [h.pow(2).mean(1)]

            g += [sum(goodness).unsqueeze(1)]
        g = torch.cat(g, 1)
        return g.argmax(1)


    def train(self, x_pos, x_neg, epochs_per_layer):

        if self.using_cuda:
            h_pos, h_neg = x_pos.cuda(), x_neg.cuda()
        else:
            h_pos, h_neg = x_pos, x_neg

        for i, l in enumerate(self.layers):
            print(f'training layer {i}')
            h_pos, h_neg = l.train(h_pos, h_neg, epochs_per_layer[i])
            l.trained = True

        return h_pos.cpu(), h_neg.cpu()

    def train_last(self, h_pos, h_neg, num_epochs):     # train one last layer of the model one more time(its defined to train added layers)
        if self.using_cuda:
            if h_pos.get_device() == -1:
                h_pos = h_pos.cuda()
            if h_neg.get_device() == -1:
                h_neg = h_neg.cuda()

        h_pos, h_neg = self.layers[-1].train(h_pos, h_neg, num_epochs)
        return h_pos.cpu(), h_neg.cpu()

    def accuracy(self, x, y): # Calculater accuracy of the model
        return self.predict(x).eq(y).float().mean().item()

### Loading Dataset and training the model

In [51]:
torch.cuda.empty_cache()

# Transform image into compatible vector
transform = Compose([
        ToTensor(),
        Normalize((0,), (1,)),
        Lambda(lambda x: torch.flatten(x))])

# Load the whole dataset into tensor X and Y

train_loader = DataLoader(
    MNIST('./data/', train=True,
        download=True,
        transform=transform),
    batch_size=50000, shuffle=True)

test_loader = DataLoader(
    MNIST('./data/', train=False,
        download=True,
        transform=transform),
    batch_size=10000, shuffle=False)


use_cuda = torch.cuda.is_available()
print('Using cuda' if use_cuda else 'Using cpu')

x, y = next(iter(train_loader))
X_pos = generate_positive(x, y)
X_neg = generate_negative(x, y)

image_shape = X_pos.shape[1]

model = FFNet([image_shape, 500], use_cuda)

h_pos, h_neg = model.train(X_pos, X_neg, [1300])

acc_last = 0
acc = model.accuracy(x.cuda(), y.cuda())
print('accuracy:', '%.2f' % (acc*100)+"%")

while acc - acc_last > 0.01:
    print('adding layer...')
    model.add_layer(500)
    h_pos, h_neg = model.train_last(h_pos, h_neg, 1200)
    acc_last = acc
    acc = model.accuracy(x.cuda(), y.cuda())
    print('accuracy:', '%.2f' % (acc*100)+"%")

print('train accuracy:', '%.2f' % (acc*100)+"%")

x, y = next(iter(test_loader))

print('test accuracy:', '%.2f' % (model.accuracy(x.cuda(), y.cuda())*100)+"%")

Using cuda
training layer 0


100%|██████████| 1300/1300 [01:14<00:00, 17.55it/s]


accuracy: 91.89%
adding layer...


100%|██████████| 1200/1200 [00:46<00:00, 25.81it/s]


accuracy: 92.71%
train accuracy: 92.71%
test accuracy: 92.67%


### Algorithm explanation

The model above is just a nueral net that it loss is changed with another loss which is only dependent to goodness. the only difference with a EBP linear model is is train function where loss is being calculated and parameters being updated. in usual networks the loss is defined for each layer is dependent to all layers in forward toward the output. 

## Unsupervised fully connected network

### Updating data generator functions to use data in an unsupervised way

in ```generate_negative``` function, I used transforms to apply guassianBlur to a random tensor like X and applied a threshold as mentioned in article, I have tested the method below in mask_test.ipynb file

In [52]:
def generate_positive(x, _):
    return x.clone()

def generate_negative(x, _):

    rnd = torch.randperm(x.size(0))

    mask = torch.rand(x.shape[0], 28, 28)
    transform = Compose([
        GaussianBlur(5, 7),
        GaussianBlur(7, 14),
        GaussianBlur(5, 7),
        Lambda(lambda x: x > 0.5)])
    mask = transform(mask).view(x.shape[0], -1)

    x_neg = x * mask + x[rnd] * ~mask

    return x_neg

### Updating FFNet to add a linear classifier to its output

In [53]:
class FFNet2(torch.nn.Module):

    def __init__(self, dims=[784, 2000, 2000, 2000, 2000], use_cuda=False):
        super().__init__()
        self.dims = dims
        self.using_cuda = use_cuda
        self.layers = [FFLayer(dims[d], dims[d + 1]) for d in range(len(dims) - 1)]
        self.linear = self.linear = nn.Linear(sum(dims[2:]), 10)

        if use_cuda:
            self.layers = [l.cuda() for l in self.layers]
            self.linear = self.linear.cuda()


    def add_layer(self, dim: int):
        self.layers += [FFLayer(self.dims[-1], dim)]
        self.dims += [dim]
        if self.using_cuda:
            self.layers[-1] = self.layers[-1].cuda()

    def predict(self, x):

        h = x
        layer_outputs = []
        for i, layer in enumerate(self.layers):
            h = layer.forward(h)
            if i > 0:
                layer_outputs.append(h)

        layer_outputs = torch.cat(layer_outputs, 1)

        ans = self.linear(layer_outputs).argmax(1)

        return ans


    def train(self, x_pos, x_neg, y, num_epochs=[1000,10000]):

        if self.using_cuda:
            h_pos, h_neg = x_pos.cuda(), x_neg.cuda()
            y = y.cuda()
        else:
            h_pos, h_neg = x_pos, x_neg

        layer_outputs = []

        for i, l in enumerate(self.layers):

            print(f'training layer {i}')
            h_pos, h_neg = l.train(h_pos, h_neg, num_epochs[0])
            l.trained = True

            if i > 0:
                layer_outputs.append(h_pos)

        layer_outputs = torch.cat(layer_outputs, 1)

        print('training linear classifier')
        criterion = nn.CrossEntropyLoss()
        optimizer = Adam(model.linear.parameters(), lr=0.03)

        for _ in range(num_epochs[1]):

            output = model.linear(layer_outputs)
            loss = criterion(output, y.view(-1))

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()


    def accuracy(self, x, y):
        return self.predict(x).eq(y).float().mean().item()

### Loading dataset and training

In [54]:
torch.cuda.empty_cache()

# Transform image into compatible vector
transform = Compose([
        ToTensor(),
        Lambda(lambda x: torch.flatten(x)/255)])

# Load the whole dataset into tensor X and Y

train_loader = DataLoader(
    MNIST('./data/', train=True,
        download=True,
        transform=transform),
    batch_size=50000, shuffle=True)

test_loader = DataLoader(
    MNIST('./data/', train=False,
        download=True,
        transform=transform),
    batch_size=10000, shuffle=False)


use_cuda = torch.cuda.is_available()
print('Using cuda' if use_cuda else 'Using cpu')

x, y = next(iter(train_loader))
X_pos = generate_positive(x, y)
X_neg = generate_negative(x, y)

image_shape = X_pos.shape[1]

model = FFNet2(dims=[784, 500, 500, 500], use_cuda=use_cuda)

model.train(X_pos, X_neg, y, num_epochs=[1100, 2000])


Using cuda
training layer 0


100%|██████████| 1000/1000 [00:56<00:00, 17.73it/s]


training layer 1


100%|██████████| 1000/1000 [00:39<00:00, 25.33it/s]


training layer 2


100%|██████████| 1000/1000 [00:40<00:00, 24.97it/s]


training linear classifier


### Accuracy measurements

In [55]:

acc = model.accuracy(x.cuda(), y.cuda())

print('train accuracy:', '%.2f' % (acc*100))

x, y = next(iter(test_loader))

print('test accuracy:', '%.2f' % (model.accuracy(x.cuda(), y.cuda())*100))

train accuracy: 95.67
test accuracy: 92.19


### Model outputs

The output of the first part of the model is a large vector called ```layer_outputs```, which contains a large number of features from the input data. Model neurons are trained to react to real numbers that represent one of the digits, but they do not react to meaningless images that have similarities and short-range correlation with digits. So, the model has learned to extract features from the images that contain a real number, and in this case, some neurons will definitely react more to some specific digits.

### Algorithm

we just added a linear classifier layer to the model and passed the output of all neurons (except first layer as mentioned in the article) to it so as mentioned before it can learn how to distinguish features from each other with sufficient epochs of training.