In [49]:
import torch
import numpy as np
from tqdm import tqdm 


In [99]:
def overlay_y_on_x(x, y):
    x_ = x.clone()
    x_[:, :10] *= 0.0
    x_[range(x.shape[0]), y] = x.max()
    return x_


class Net(torch.nn.Module):

    def __init__(self, dims):
        super().__init__()
        self.layers = []
        for d in range(len(dims) - 1):
            self.layers += [Layer(dims[d], dims[d + 1]).cuda()]

    def predict(self, x):
        goodness_per_label = []
        for label in range(10):
            h = overlay_y_on_x(x, label)
            goodness = []
            for layer in self.layers:
                h = layer(h)
                goodness += [h.pow(2).mean(1)]
            goodness_per_label += [sum(goodness).unsqueeze(1)]
        goodness_per_label = torch.cat(goodness_per_label, 1)
        return goodness_per_label.argmax(1)

    def train(self, x_pos, x_neg):
        h_pos, h_neg = x_pos, x_neg
        for i, layer in enumerate(self.layers):
            print('training layer', i, '...')
            h_pos, h_neg = layer.train(h_pos, h_neg)


class Layer(nn.Linear):
    def __init__(self, in_features, out_features,
                 bias=True, device=None, dtype=None):
        super().__init__(in_features, out_features, bias, device, dtype)
        self.relu = torch.nn.ReLU()
        self.opt = Adam(self.parameters(), lr=0.03)
        self.threshold = 2.0
        self.num_epochs = 1000

    def forward(self, x):
        x_direction = x / (x.norm(2, 1, keepdim=True) + 1e-4)
        return self.relu(
            torch.mm(x_direction, self.weight.T) +
            self.bias.unsqueeze(0))

    def train(self, x_pos, x_neg):
        for i in tqdm(range(self.num_epochs)):
            g_pos = self.forward(x_pos).pow(2).mean(1)
            g_neg = self.forward(x_neg).pow(2).mean(1)
            # The following loss pushes pos (neg) samples to
            # values larger (smaller) than the self.threshold.
            loss = torch.log(1 + torch.exp(torch.cat([
                -g_pos + self.threshold,
                g_neg - self.threshold]))).mean()
            self.opt.zero_grad()
            # this backward just compute the derivative and hence
            # is not considered backpropagation.
            loss.backward()
            self.opt.step()
        return self.forward(x_pos).detach(), self.forward(x_neg).detach()

In [100]:
class FBNet(torch.nn.Module):
    """
    a fully connected neural network trained with forward-backward with a n_layers hidden layer"""
    def __init__(self, n_layers=2, n_input=10, n_output=4, lr=0.001):
        super().__init__()
        self.n_layers = n_layers
        self.layers = torch.nn.ModuleList([torch.nn.Linear(n_input, n_input).to("cuda") for i in range(n_layers)])
        self.out = torch.nn.Linear(n_input, n_output)

        self.criterion = torch.nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.parameters(), lr=lr)

    def forward(self, x):
        x = x.to("cuda")
        for layer in self.layers:
            x = layer(x)
            x = torch.nn.functional.relu(x)
        x = self.out(x)
        return x
    
    def train(self, x, y, epochs):
        for _ in range(epochs):
            self.optimizer.zero_grad()
            output = self.forward(x)
            y = y.to("cuda")
            loss = self.criterion(output, y)
            loss.backward()
            self.optimizer.step()
        # calculate accuracy
        _, predictions = torch.max(output, 1)
        accuracy = (predictions == y).float().mean()

        return accuracy.item()
    
    def eval(self, x, y):
      x = x.to("cuda")
      y = y.to("cuda")
      with torch.no_grad():
          output = self.forward(x)
          predictions = torch.argmax(output, dim=1)
          accuracy = (predictions == y).float().mean()
      return accuracy.item()


In [103]:
from torchvision.datasets import MNIST
from torchvision.transforms import Compose, ToTensor, Normalize, Lambda
from torch.utils.data import DataLoader

from tqdm import tqdm

import pandas as pd

def MNIST_loaders(train_batch_size=50000, test_batch_size=10000):
    transform = Compose([
        ToTensor(),
        Normalize((0.1307,), (0.3081,)),
        Lambda(lambda x: torch.flatten(x))])

    train_loader = DataLoader(
        MNIST('./data/', train=True,
              download=True,
              transform=transform),
        batch_size=train_batch_size, shuffle=True)

    test_loader = DataLoader(
        MNIST('./data/', train=False,
              download=True,
              transform=transform),
        batch_size=test_batch_size, shuffle=False)

    return train_loader, test_loader

def get_model(ff_net_bool, n_layers, n_input, n_output):
    if ff_net_bool:
        model = FFNet(n_layers=n_layers, n_input=n_input, n_output=n_output)
    else:
        model = FBNet(n_layers=n_layers, n_input=n_input, n_output=n_output)
    return model

def train_and_get_eval_fb(model, epochs=1):
    train_loader, test_loader = MNIST_loaders()
    model = model.to("cuda")
    
    with torch.autocast("cuda", dtype=torch.float32):
        for _ in range(epochs):
            for x, y in train_loader:
                x.to("cuda")
                y.to("cuda")
                acc_train = model.train(x, y, 80)
    
    # eval on test
    with torch.autocast("cuda", dtype=torch.float32):
        acc_test = []
        for x, y in test_loader:
            x.to("cuda")
            y.to("cuda")
            test_acc = model.eval(x, y)
            acc_test.append(test_acc)
    
    train_acc = torch.tensor(acc_train).mean().item()
    test_acc = torch.tensor(acc_test).mean().item()

    return train_acc, test_acc

def train_and_get_eval_ff(epochs=1):
    torch.manual_seed(1234)
    train_loader, test_loader = MNIST_loaders()

    net = Net([784, 500, 500])
    x, y = next(iter(train_loader))
    x, y = x.cuda(), y.cuda()
    
    x_pos = overlay_y_on_x(x, y)
    rnd = torch.randperm(x.size(0))
    x_neg = overlay_y_on_x(x, y[rnd])
    net.train(x_pos, x_neg)

    train_acc=net.predict(x).eq(y).float().mean().item()

    print('train acc:', train_acc)

    x_te, y_te = next(iter(test_loader))
    x_te, y_te = x_te.cuda(), y_te.cuda()

    test_acc=net.predict(x_te).eq(y_te).float().mean().item()

    print('test acc:', test_acc)

    return train_acc, test_acc



if __name__ == "__main__":
    ff_train_accs, ff_test_accs = [], []
    fb_train_accs, fb_test_accs = [], []

    for _ in tqdm(range(20)):
        n_layers = 2
        n_input = 784
        n_output = 10

        print("FBNet")
        model = get_model(False, n_layers, n_input, n_output)
        fb_train_acc, fb_test_acc = train_and_get_eval_fb(model)
        fb_train_accs.append(fb_train_acc)
        fb_test_accs.append(fb_test_acc)

        print("FFNet")
        model = get_model(True, n_layers, n_input, n_output)
        ff_train_acc, ff_test_acc = train_and_get_eval_ff()
        ff_train_accs.append(ff_train_acc)
        ff_test_accs.append(ff_test_acc)
        
    # Print the final accuracy results
    print(" ")
    print("FFNet train accuracy: {:.4f} +- {:.4f}".format(np.mean(ff_train_accs), np.std(ff_train_accs)))
    print("FFNet test accuracy: {:.4f} +- {:.4f}".format(np.mean(ff_test_accs), np.std(ff_test_accs)))
    print("FBNet train accuracy: {:.4f} +- {:.4f}".format(np.mean(fb_train_accs), np.std(fb_train_accs)))
    print("FBNet test accuracy: {:.4f} +- {:.4f}".format(np.mean(fb_test_accs), np.std(fb_test_accs)))
    
    # to pandas csv
    df = pd.DataFrame({"ff_train_acc": ff_train_accs, "ff_test_acc": ff_test_accs, "fb_train_acc": fb_train_accs, "fb_test_acc": fb_test_accs})
    df.to_csv("accs.csv")




  0%|          | 0/1 [00:00<?, ?it/s]

FBNet
FFNet
training layer 0 ...



  0%|          | 0/1000 [00:00<?, ?it/s][A
  2%|▏         | 16/1000 [00:00<00:08, 115.99it/s][A
  3%|▎         | 28/1000 [00:00<00:37, 26.25it/s] [A
  3%|▎         | 34/1000 [00:01<00:45, 21.20it/s][A
  4%|▍         | 38/1000 [00:01<00:48, 19.68it/s][A
  4%|▍         | 41/1000 [00:01<00:51, 18.68it/s][A
  4%|▍         | 44/1000 [00:02<00:53, 17.80it/s][A
  5%|▍         | 47/1000 [00:02<00:56, 16.80it/s][A
  5%|▍         | 49/1000 [00:02<00:58, 16.29it/s][A
  5%|▌         | 51/1000 [00:02<00:59, 15.95it/s][A
  5%|▌         | 53/1000 [00:02<00:59, 15.86it/s][A
  6%|▌         | 55/1000 [00:02<01:00, 15.59it/s][A
  6%|▌         | 57/1000 [00:02<01:00, 15.60it/s][A
  6%|▌         | 59/1000 [00:03<01:01, 15.23it/s][A
  6%|▌         | 61/1000 [00:03<01:03, 14.89it/s][A
  6%|▋         | 63/1000 [00:03<01:03, 14.68it/s][A
  6%|▋         | 65/1000 [00:03<01:04, 14.52it/s][A
  7%|▋         | 67/1000 [00:03<01:03, 14.58it/s][A
  7%|▋         | 69/1000 [00:03<01:03, 14.77it/s][A

training layer 1 ...



  0%|          | 0/1000 [00:00<?, ?it/s][A
  0%|          | 2/1000 [00:00<00:56, 17.66it/s][A
  0%|          | 4/1000 [00:00<00:58, 16.98it/s][A
  1%|          | 6/1000 [00:00<00:59, 16.78it/s][A
  1%|          | 8/1000 [00:00<00:59, 16.61it/s][A
  1%|          | 10/1000 [00:00<00:59, 16.57it/s][A
  1%|          | 12/1000 [00:00<00:59, 16.56it/s][A
  1%|▏         | 14/1000 [00:00<01:00, 16.40it/s][A
  2%|▏         | 16/1000 [00:00<00:57, 17.07it/s][A
  2%|▏         | 19/1000 [00:01<00:50, 19.51it/s][A
  2%|▏         | 22/1000 [00:01<00:46, 21.11it/s][A
  2%|▎         | 25/1000 [00:01<00:43, 22.19it/s][A
  3%|▎         | 28/1000 [00:01<00:42, 22.92it/s][A
  3%|▎         | 31/1000 [00:01<00:41, 23.42it/s][A
  3%|▎         | 34/1000 [00:01<00:40, 23.74it/s][A
  4%|▎         | 37/1000 [00:01<00:40, 23.89it/s][A
  4%|▍         | 40/1000 [00:01<00:39, 24.04it/s][A
  4%|▍         | 43/1000 [00:02<00:39, 24.23it/s][A
  5%|▍         | 46/1000 [00:02<00:39, 24.37it/s][A
  5%|

train acc: 0.9324599504470825


100%|██████████| 1/1 [02:18<00:00, 138.20s/it]

test acc: 0.9315999746322632
 
FFNet train accuracy: 0.9325 +- 0.0000
FFNet test accuracy: 0.9316 +- 0.0000
FBNet train accuracy: 1.0000 +- 0.0000
FBNet test accuracy: 0.9733 +- 0.0000





In [57]:
import torch
import torch.nn as nn
from tqdm import tqdm
from torch.optim import Adam
from torchvision.datasets import MNIST
from torchvision.transforms import Compose, ToTensor, Normalize, Lambda
from torch.utils.data import DataLoader


def MNIST_loaders(train_batch_size=50000, test_batch_size=10000):

    transform = Compose([
        ToTensor(),
        Normalize((0.1307,), (0.3081,)),
        Lambda(lambda x: torch.flatten(x))])

    train_loader = DataLoader(
        MNIST('./data/', train=True,
              download=True,
              transform=transform),
        batch_size=train_batch_size, shuffle=True)

    test_loader = DataLoader(
        MNIST('./data/', train=False,
              download=True,
              transform=transform),
        batch_size=test_batch_size, shuffle=False)

    return train_loader, test_loader


def overlay_y_on_x(x, y):
    x_ = x.clone()
    x_[:, :10] *= 0.0
    x_[range(x.shape[0]), y] = x.max()
    return x_


class Net(torch.nn.Module):

    def __init__(self, dims):
        super().__init__()
        self.layers = []
        for d in range(len(dims) - 1):
            self.layers += [Layer(dims[d], dims[d + 1]).cuda()]

    def predict(self, x):
        goodness_per_label = []
        for label in range(10):
            h = overlay_y_on_x(x, label)
            goodness = []
            for layer in self.layers:
                h = layer(h)
                goodness += [h.pow(2).mean(1)]
            goodness_per_label += [sum(goodness).unsqueeze(1)]
        goodness_per_label = torch.cat(goodness_per_label, 1)
        return goodness_per_label.argmax(1)

    def train(self, x_pos, x_neg):
        h_pos, h_neg = x_pos, x_neg
        for i, layer in enumerate(self.layers):
            print('training layer', i, '...')
            h_pos, h_neg = layer.train(h_pos, h_neg)


class Layer(nn.Linear):
    def __init__(self, in_features, out_features,
                 bias=True, device=None, dtype=None):
        super().__init__(in_features, out_features, bias, device, dtype)
        self.relu = torch.nn.ReLU()
        self.opt = Adam(self.parameters(), lr=0.03)
        self.threshold = 2.0
        self.num_epochs = 1000

    def forward(self, x):
        x_direction = x / (x.norm(2, 1, keepdim=True) + 1e-4)
        return self.relu(
            torch.mm(x_direction, self.weight.T) +
            self.bias.unsqueeze(0))

    def train(self, x_pos, x_neg):
        for i in tqdm(range(self.num_epochs)):
            g_pos = self.forward(x_pos).pow(2).mean(1)
            g_neg = self.forward(x_neg).pow(2).mean(1)
            # The following loss pushes pos (neg) samples to
            # values larger (smaller) than the self.threshold.
            loss = torch.log(1 + torch.exp(torch.cat([
                -g_pos + self.threshold,
                g_neg - self.threshold]))).mean()
            self.opt.zero_grad()
            # this backward just compute the derivative and hence
            # is not considered backpropagation.
            loss.backward()
            self.opt.step()
        return self.forward(x_pos).detach(), self.forward(x_neg).detach()

if __name__ == "__main__":
    torch.manual_seed(1234)
    train_loader, test_loader = MNIST_loaders()

    net = Net([784, 500, 500])
    x, y = next(iter(train_loader))
    x, y = x.cuda(), y.cuda()
    x_pos = overlay_y_on_x(x, y)
    rnd = torch.randperm(x.size(0))
    x_neg = overlay_y_on_x(x, y[rnd])
    net.train(x_pos, x_neg)

    print('train error:', net.predict(x).eq(y).float().mean().item())

    x_te, y_te = next(iter(test_loader))
    x_te, y_te = x_te.cuda(), y_te.cuda()

    print('test error:', net.predict(x_te).eq(y_te).float().mean().item())

    # calc cross entropy
    # y_pred = net.predict(x_te)
    # criterion = torch.nn.CrossEntropyLoss()
    # loss = criterion(y_pred, y_te.float())
    # print(loss)


training layer 0 ...


100%|██████████| 1000/1000 [01:06<00:00, 15.04it/s]


training layer 1 ...


100%|██████████| 1000/1000 [00:40<00:00, 24.44it/s]


train error: 0.9324599504470825
test error: 0.9315999746322632
