### Part A
Implimentation of Supervised Forward Forward Net.

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, sampler
import torchvision.datasets as datasets
import torchvision.transforms as T
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'cpu'
print('device = ', device)
dtype = torch.float64

device =  cuda


Functions to build positive and negative data by embedding labels into flattened data.


In [None]:
def flatten(x):
  N = x.shape[0] # read in N, C, H, W
  return x.view(N, -1)  # "flatten" the C * H * W values into a single vector per image

def put_y_in_x(x, y, num_classes = 10):
  x = flatten(x)
  N = x.shape[0]
  Y = torch.zeros((N, num_classes), dtype = x.dtype, device= x.device)
  Y[range(N), y] = 1
  return torch.cat((x, Y), dim=1)

def build_x_negetive(x, num_classes = 10):
  y = torch.randint(num_classes, (x.shape[0],))
  return put_y_in_x(x, y, num_classes)

A class For building the model.
all functions for predicting and training the model are defined here.

This code is inspired by https://github.com/mpezeshki/pytorch_forward_forward/blob/main/main.py.

In [None]:
class FF(nn.Module):
  def __init__(self, input_size: int, hidden_size: list | tuple, num_classes : int, threshold, device = device):
    super().__init__()
    # assign layer objects to class attributes
    self.num_layers = len(hidden_size)
    self.num_classes = num_classes
    self.device = device
    self.threshold = threshold
    layer_size = [input_size] + list(hidden_size)
    self.layers = []
    for i in range(self.num_layers):
      self.layers.append(nn.Sequential(nn.Linear(layer_size[i], layer_size[i+1], device= device, dtype= dtype),
                                       nn.ReLU()))

  def layer_forward(self, x, layer: nn.Sequential):
    # forward always defines connectivity
    out = x / x.norm(2, dim= 1, keepdim= True)
    # print(x.device, list(layer[0].parameters())[0].device)
    return layer(out)

  def predict(self, x: torch.Tensor):
    x = x.to(device= self.device, dtype= dtype)
    label_goodness = []
    for label in range(self.num_classes):
      out = put_y_in_x(x, label, self.num_classes)
      goodness = []
      for layer in self.layers:
        out = self.layer_forward(out, layer)
        goodness.append(out.pow(2).mean(dim = 1, keepdim = True))
      goodness = torch.cat(goodness, dim= 1)
      label_goodness.append(goodness.sum(dim= 1, keepdim= True))
    label_goodness = torch.cat(label_goodness, dim= 1)
    return label_goodness.argmax(dim= 1)

  def train_layer(self, layer, x_positive, x_negetive, opt_iter, optimizer):
    optimizer.zero_grad()
    for itter in range(opt_iter):
      out_positive = self.layer_forward(x_positive, layer)
      out_negetive = self.layer_forward(x_negetive, layer)
      goodness = out_positive.pow(2).mean(dim= 1)
      badness = out_negetive.pow(2).mean(dim= 1)
      loss = torch.log(1+ torch.exp(torch.cat((self.threshold - goodness, badness - self.threshold)))).mean()
      # print(loss)
      loss.backward()
      optimizer.step()

  def train(self, loader_train, loader_val, epochs, opt_iter, lr = 1e-3, print_every= 100):
    self.optim = [optim.Adam(layer.parameters(), lr = lr) for layer in self.layers]
    val_acc = []
    models = []
    for e in range(epochs):
      for t, (x, y) in enumerate(loader_train):
        x = x.to(device= self.device, dtype= dtype)
        # print(x.requires_grad)
        y = y.to(device= self.device)
        out_positive = put_y_in_x(x, y, self.num_classes)
        out_negetive = build_x_negetive(x, self.num_classes)
        for i in range(self.num_layers):
          self.train_layer(self.layers[i], out_positive, out_negetive, opt_iter, self.optim[i])
          out_positive = self.layer_forward(out_positive, self.layers[i]).detach()
          out_negetive = self.layer_forward(out_negetive, self.layers[i]).detach()
        if t % print_every == 0:
          preds = self.predict(x)
          num_correct = (preds == y).sum()
          num_samples = preds.size(0)
          acc = float(num_correct) / num_samples
          print('Iteration ', t, ' : ', 'Got %d / %d correct (%.2f)' % (num_correct, num_samples, 100 * acc))
      print()
      print('epoch ', e, ' :')
      val_acc.append(check_accuracy(loader_val, self))
      models.append(self.cpu())
      print()
    return val_acc, models


In [None]:
def check_accuracy(loader, model):
    if loader.dataset.train:
        print('Checking accuracy on validation set')
    else:
        print('Checking accuracy on test set')
    num_correct = 0
    num_samples = 0
    # model.eval()  # set model to evaluation mode
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device=device, dtype = dtype)
            preds = model.predict(x)
            y = y.to(device=device, dtype = preds.dtype)
            # _, preds = scores.max(1)
            # print(preds.shape, x.shape , y.shape)
            num_correct += (preds == y).sum()
            num_samples += preds.size(0)
        acc = float(num_correct) / num_samples
        print('Got %d / %d correct (%.2f)' % (num_correct, num_samples, 100 * acc))
    return acc

In [None]:
mnist_train = datasets.MNIST('./datasets', train=True, download=True,
                             transform= T.ToTensor())
mnist_test = datasets.MNIST('./datasets', train=False, download=True,
                             transform= T.ToTensor())
loader_test = DataLoader(mnist_test, batch_size=512)
loader_train = DataLoader(mnist_train, batch_size=512, sampler=sampler.SubsetRandomSampler(range(40000)))
loader_val = DataLoader(mnist_train, batch_size=512, sampler=sampler.SubsetRandomSampler(range(40000, 50000)))

In [None]:
len(loader_train)

79

In [None]:
model = FF(28*28+10, [500, 500], 10, threshold= 2, device= device)
val_acc, models = model.train(loader_train, loader_val, epochs= 20, opt_iter= 20, print_every= 30)

Iteration  0  :  Got 60 / 512 correct (11.72)
Iteration  30  :  Got 48 / 512 correct (9.38)
Iteration  60  :  Got 56 / 512 correct (10.94)

epoch  0  :
Checking accuracy on validation set
Got 1115 / 10000 correct (11.15)

Iteration  0  :  Got 53 / 512 correct (10.35)
Iteration  30  :  Got 59 / 512 correct (11.52)
Iteration  60  :  Got 64 / 512 correct (12.50)

epoch  1  :
Checking accuracy on validation set
Got 1115 / 10000 correct (11.15)

Iteration  0  :  Got 63 / 512 correct (12.30)
Iteration  30  :  Got 54 / 512 correct (10.55)
Iteration  60  :  Got 142 / 512 correct (27.73)

epoch  2  :
Checking accuracy on validation set
Got 3944 / 10000 correct (39.44)

Iteration  0  :  Got 197 / 512 correct (38.48)
Iteration  30  :  Got 255 / 512 correct (49.80)
Iteration  60  :  Got 291 / 512 correct (56.84)

epoch  3  :
Checking accuracy on validation set
Got 6265 / 10000 correct (62.65)

Iteration  0  :  Got 308 / 512 correct (60.16)
Iteration  30  :  Got 358 / 512 correct (69.92)
Iteration 

In [None]:
index = np.argmax(val_acc)
best_model = models[index]

In [None]:
check_accuracy(loader_test, best_model.to(device))

Checking accuracy on test set
Got 9249 / 10000 correct (92.49)


0.9249

### Part B
Implimentation of Unsupervised Forward Forward Net.

This code is inspired by https://github.com/IsmailKonak/FF-Algorithm-Pytorch-Implementation/blob/main/FF%20-%20Unsupervised/MLP/FF-Unsupervised.ipynb.

But for the reason that I didn't understand, It doesn't work properly.

In [None]:
from tqdm.notebook import tqdm_notebook
from sklearn.preprocessing import OneHotEncoder
from scipy.signal import convolve2d

In [None]:
def label_to_oh(y):
    y = y.numpy().reshape(-1,1)
    ohe = OneHotEncoder().fit(np.arange(10).reshape((10,1)))
    ohe_y = ohe.transform(y).toarray()
    return torch.Tensor(ohe_y)

def show_image(x):
    x = x.squeeze()
    plt.imshow(x, cmap="gray")
    plt.show()

def mask_gen():
    random_iter = np.random.randint(5,10)
    random_image = np.random.randint(2, size=(1,28,28)).squeeze().astype(np.float32)
    blur_filter = np.array([[1, 2, 1], [2, 4, 2], [1, 2, 1]]) / 16
    for i in range(random_iter):
        random_image = convolve2d(random_image, blur_filter, mode='same', boundary='symm')
    mask = (random_image > 0.5).astype(np.float32)
    return mask

# The method for creating masks for negative data that I tried for testing purposes.
def mask_gen1():
    n = 28*28
    arr1 = np.random.normal(loc=0, scale=0.01, size=int(5*n/8))
    arr1 = arr1+ abs(0-arr1.min())
    arr2 = np.random.normal(loc=1, scale=0.01, size=int(3*n/8))
    arr2 = arr2 + abs(1-arr2.max())
    arr = np.concatenate([arr1,arr2])
    np.random.shuffle(arr)
    mask = arr.reshape((1,28,28)).astype(np.float32)
    return mask

def negative_data_gen(batch):
    batch = batch[0]
    indexes = torch.randperm(batch.shape[0])
    x1 = batch
    x2 = batch[indexes]
    mask = mask_gen()
    merged_x1 = x1*mask
    merged_x2 = x2*(1-mask)
    hybrid_image = merged_x1+merged_x2
    return hybrid_image

In [None]:
class FF_Unsupervised(nn.Module):
  def __init__(self, input_size: int, hidden_size: list | tuple, num_classes : int, threshold, device = device):
    super().__init__()
    # assign layer objects to class attributes
    self.num_layers = len(hidden_size)
    self.num_classes = num_classes
    self.device = device
    self.threshold = threshold
    layer_size = [input_size] + list(hidden_size)
    self.layers = []
    for i in range(self.num_layers):
      self.layers.append(nn.Sequential(nn.Linear(layer_size[i], layer_size[i+1], device= device, dtype= dtype),
                                       nn.ReLU()))

  def layer_forward(self, x, layer: nn.Sequential):
    # forward always defines connectivity
    x_norm = x.norm(2, dim= 1, keepdim= True)
    x_norm[x_norm == 0] = 1
    out = x / x.norm(2, dim= 1, keepdim= True)
    # print(x.device, list(layer[0].parameters())[0].device)
    return layer(out)

  def predict(self, x: torch.Tensor):
    x = x.to(device= self.device, dtype= dtype)
    h = flatten(x)
    layers_output = torch.Tensor([]).cuda()
    for i, layer in enumerate(self.layers):
        h = self.layer_forward(h, layer)
        # print(h)
        layers_output = torch.cat([layers_output,h],1)
    return layers_output

  def train_layer(self, layer, x_positive, x_negetive, opt_iter, optimizer):
    optimizer.zero_grad()
    for itter in range(opt_iter):
      out_positive = self.layer_forward(x_positive, layer)
      out_negetive = self.layer_forward(x_negetive, layer)
      goodness = out_positive.pow(2).mean(dim= 1)
      badness = out_negetive.pow(2).mean(dim= 1)
      loss = torch.log(1+ torch.exp(torch.cat((self.threshold - goodness, badness - self.threshold)))).mean()
      # print(loss)
      loss.backward()
      optimizer.step()

  def train(self, loader_train, epochs, opt_iter, lr = 1e-3, print_every= 100):
    self.optim = [optim.Adam(layer.parameters(), lr = lr) for layer in self.layers]
    for e in range(epochs):
      for t, (x, y) in enumerate(loader_train):
        # x = x.to(device= self.device, dtype= dtype)
        out_positive = flatten(x).to(device= self.device, dtype= dtype)
        out_negetive = negative_data_gen((x,y)).to(device= self.device, dtype= dtype)
        out_negetive = flatten(out_negetive)
        for i in range(self.num_layers):
          # print(out_positive.shape, out_negetive.shape)
          self.train_layer(self.layers[i], out_positive, out_negetive, opt_iter, self.optim[i])
          out_positive = self.layer_forward(out_positive, self.layers[i]).detach()
          out_negetive = self.layer_forward(out_negetive, self.layers[i]).detach()
      #   if t % print_every == 0:
      #     print("e = ", e," t = ", t)
      # print()

In [None]:
class LinearClassification(nn.Module):
    def __init__(self, input_dimension):
        super().__init__()
        self.epoch_losses = []
        self.linear = torch.nn.Linear(input_dimension, 10, device= device, dtype= dtype)
        self.optimizer = optim.Adam(self.parameters(), lr=0.01)
        self.criterion = nn.CrossEntropyLoss()
        self.softmax = nn.Softmax()

    def forward(self,x):
        return self.linear(x)

    def predict(self,x):
        x = x.view(-1,28*28).cuda()
        h_activity = model.predict(x)
        y_h = self.forward(h_activity)
        soft_out = self.softmax(y_h)
        return soft_out.argmax()


    def train(self, data_loader,epoch_num):
        linear_loop = tqdm_notebook(range(epoch_num),total=epoch_num)
        for i in linear_loop:
            batch_losses = []
            for batch in iter(data_loader):
                x,y = batch
                x = x.view(-1,28*28).cuda()
                y_r = label_to_oh(y).cuda()
                h_activity = model.predict(x)
                # print(x)
                # print(h_activity.dtype)
                y_h = self.forward(h_activity)
                loss = self.criterion(y_h,y_r)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                batch_losses.append(loss)
            # print(batch_losses)
            self.epoch_losses.append(float(sum(batch_losses)/len(batch_losses)))
            linear_loop.set_description(f"Epoch [{i+1}/{epoch_num}]: ")
            linear_loop.set_postfix(loss=self.epoch_losses[i])

    def test(self, data_loader):
        batch_losses = []
        test_loss = 0
        for batch in iter(data_loader):
            x,y = batch
            x = x.view(-1,28*28).cuda()
            y_r = label_to_oh(y).cuda()
            h_activity = model.predict(x)
            y_h = self.forward(h_activity)
            loss = self.criterion(y_h,y_r)
            batch_losses.append(loss)
        test_loss = float(sum(batch_losses)/len(batch_losses))
        return test_loss

In [None]:
mnist_train = datasets.MNIST('./datasets', train=True, download=True,
                             transform= T.ToTensor())
mnist_test = datasets.MNIST('./datasets', train=False, download=True,
                             transform= T.ToTensor())
loader_test = DataLoader(mnist_test, batch_size=1024)
loader_train = DataLoader(mnist_train, batch_size=1024)

In [None]:
model = FF_Unsupervised(28*28, [500, 500, 500, 500], 10, threshold= 2, device= device)

In [None]:
model.train(loader_train, 20, 40, print_every= 20)

In [None]:
linear_model = LinearClassification(4*500)
losses = linear_model.train(loader_train, 100)

  0%|          | 0/100 [00:00<?, ?it/s]

In [None]:
import matplotlib.pyplot as plt
plt.plot(linear_model.epoch_losses)
plt.title("Loss over training")
plt.show()

In [None]:
print(linear_model.epoch_losses)

[11.941828724442027]
