In [1]:
import torch

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
x_pos=torch.tensor([[1,2,3],[3,2,1],[5,6,5]]).type(torch.float32)

In [3]:
x_pos.pow(2).mean(dim=1)

tensor([ 4.6667,  4.6667, 28.6667])

In [4]:
def base_loss(X_pos: torch.Tensor, X_neg: torch.Tensor, th: float) -> torch.Tensor:
    """Base loss described in the paper. Log(1+exp(x)) is added to help differentiation.
    Args:
        X_pos (torch.Tensor): batch of positive model predictions
        X_neg (torch.Tensor): batch of negative model predictions
        th (float): loss function threshold
    Returns:
        torch.Tensor: output loss
    """
    logits_pos = X_pos.pow(2).mean(dim=1)
    logits_neg = X_neg.pow(2).mean(dim=1)

    loss_pos = - logits_pos + th
    loss_neg = logits_neg - th

    loss_poss = torch.log(1 + torch.exp(loss_pos)).mean()
    loss_neg = torch.log(1 + torch.exp(loss_neg)).mean()

    loss = loss_poss + loss_neg

    return loss

In [43]:
from typing import Callable, Tuple
from torch.nn import Linear

class FFLinear(Linear):
    "Forward-Forward-layer"

    def __init__(self, input_features:int, output_features:int, activation:torch.nn, 
                 optimizer:torch.optim, layer_optim_learning_rate: float, threshold:float, loss_fn:Callable, bias:bool = True):
        super(FFLinear, self).__init__(input_features, output_features, bias)

        self.activation = activation
        self.optimizer = optimizer(self.parameters(), lr = layer_optim_learning_rate)
        self.threshold = threshold
        self.loss_fn = loss_fn
        
    def forward(self, x: torch.Tensor) -> torch.Tensor: 

        x = x / (x.norm(2,1, keepdim=True) + 1e-8)

        return self.activation(torch.mm(x, self.weight.T) + self.bias.unsqueeze(0))


    def train_layer(self, X_pos, X_neg, before : bool) -> Tuple[torch.Tensor, torch.Tensor, int]:
        
        X_pos_out = self.forward(X_pos)
        X_neg_out = self.forward(X_neg)


        loss = self.loss_fn(X_pos_out, X_neg_out, self.threshold)

        self.optimizer.zero_grad()

        loss.backward()
        self.optimizer.step()

            # 학습후 output을 전달할 것인지, 학습전 output을 전달할 것인지
        if before:
            return X_pos_out.detach(), X_neg_out.detach(), loss.item()
        else:
            return self.forward(X_pos).detach(), self.forward(X_neg).detach(), loss.item() 


        


In [6]:
hidden_dimensions = [784, 512, 512]
for i in range(len(hidden_dimensions) - 1):
    print(i)
    print(hidden_dimensions[i],hidden_dimensions[i+1])

0
784 512
1
512 512


In [7]:
from pickle import FALSE

import torchvision.transforms as transforms
import torchvision.datasets as dsets
from torchvision.transforms import Compose, ToTensor, Lambda, Normalize
# MNIST dataset
transform = Compose([
    ToTensor(),
    Normalize((0.1307,), (0.3081,)),
    Lambda(lambda x: torch.flatten(x))])

mnist_train = dsets.MNIST(root='MNIST_data/',
                          train=True,
                          transform=transform,
                          download=True)

mnist_test = dsets.MNIST(root='MNIST_data/',
                          train=False,
                          transform=transform,
                          download=True)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to MNIST_data/MNIST\raw\train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:01<00:00, 5292391.68it/s]


Extracting MNIST_data/MNIST\raw\train-images-idx3-ubyte.gz to MNIST_data/MNIST\raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to MNIST_data/MNIST\raw\train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 28917568.35it/s]


Extracting MNIST_data/MNIST\raw\train-labels-idx1-ubyte.gz to MNIST_data/MNIST\raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to MNIST_data/MNIST\raw\t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 3779307.80it/s]


Extracting MNIST_data/MNIST\raw\t10k-images-idx3-ubyte.gz to MNIST_data/MNIST\raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to MNIST_data/MNIST\raw\t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 4551010.22it/s]

Extracting MNIST_data/MNIST\raw\t10k-labels-idx1-ubyte.gz to MNIST_data/MNIST\raw






In [8]:
from torch.utils.data import DataLoader

# dataset loader
data_loader = DataLoader(dataset=mnist_train,
                                          batch_size=1024, # 배치 크기는 100
                                          shuffle=True)
test_loader = DataLoader(dataset=mnist_test,
                                          batch_size=1024, # 배치 크기는 100
                                          shuffle=False)


In [9]:
images, labels = next(iter(data_loader))

In [10]:
images.size()

torch.Size([1024, 784])

In [11]:
images, labels = next(iter(test_loader))
images.size()

torch.Size([1024, 784])

In [63]:
import torch
def generate_positive_negative_samples_overlay(X: torch.Tensor, Y: torch.Tensor, only_positive: bool):
    """Generate positive and negative samples using labels. It overlays labels in input. For neg it does
    the same but with shuffled labels.
    Args:
        X (torch.Tensor): batch of samples
        Y (torch.Tensor): batch of labels
        only_positive (bool): if True, it outputs only positive exmples with labels overlayed
    Returns:
        Tuple[torch.Tensor]: batch of positive (and negative samples)
    """
    X_pos = X.clone()
    X_pos[:, :10] *= 0.0
    X_pos[range(X.shape[0]), Y] = X_pos.max()  # one hot
    # X_pos[range(X.shape[0]), Y] = 1  # one hot

    if only_positive:
        return X_pos
    else:
        X_neg = X.clone()
        rnd = torch.randperm(X_neg.size(0))
        # Y_neg = (Y + torch.randint(1, (Y.max()-1), (Y.shape[0],))) % Y.max() # still don't get why does not work
        Y_neg = Y[rnd]
        X_neg[:, :10] *= 0.0
        X_neg[range(X_neg.shape[0]), Y_neg] = X_neg.max()  # one hot

        return X_pos, X_neg


class TrainingDatasetFF(torch.utils.data.Dataset):
    """Utility class to store positive and negative examples to train
       with FF algorithm.
    """

    def __init__(self, dataset_generator: DataLoader) -> None:
        """Initialize TrainingDatasetFF
        Args:
            dataset_generator (DataLoader): DataLoader to store
        """
        with torch.no_grad():
            self.dataset = [
                batch
                for X_pos, X_neg in dataset_generator
                for batch in zip(X_pos, X_neg)
            ]

    def __getitem__(self, index: int):
        return self.dataset[index]

    def __len__(self):
        return len(self.dataset)


pos_gen_fn = generate_positive_negative_samples_overlay
train_loader_ff = torch.utils.data.DataLoader(TrainingDatasetFF(pos_gen_fn(X.to(device),Y.to(device), False) for X, Y in data_loader),
                                              batch_size=data_loader.batch_size, shuffle=True
                                              )

In [66]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
ffl_list = []
hidden_dimensions = [784, 2000, 2000, 2000, 2000,2000]
for i in range(len(hidden_dimensions) - 1):
    layer_ = FFLinear(hidden_dimensions[i], hidden_dimensions[i+1], activation = torch.nn.ReLU(), optimizer = torch.optim.Adam,
                  layer_optim_learning_rate=0.09, threshold = 5.0, loss_fn = base_loss).to(device)
    torch.nn.init.xavier_uniform_(layer_.weight)
    ffl_list.append(layer_)


In [40]:
ffl_list

[FFLinear(
   in_features=784, out_features=2000, bias=True
   (activation): ReLU()
 ),
 FFLinear(
   in_features=2000, out_features=2000, bias=True
   (activation): ReLU()
 ),
 FFLinear(
   in_features=2000, out_features=2000, bias=True
   (activation): ReLU()
 ),
 FFLinear(
   in_features=2000, out_features=2000, bias=True
   (activation): ReLU()
 )]

In [41]:
# import tqdm

# n_epochs = 1
# tqdm_loss = tqdm.tqdm(range(n_epochs))
# for epoch in tqdm_loss:
#     loss_list = [[] for _ in range(len(ffl_list))]
#     for X_pos, Y_neg in train_loader_ff:
#         X_pos_tmp, Y_neg_tmp = X_pos, Y_neg
#         for idx in range(len(ffl_list)):
#             X_pos_tmp,Y_neg_tmp,layer_losses = ffl_list[idx].train_layer(X_pos_tmp.to(device), Y_neg_tmp.to(device), before=False)
#             loss_list[idx].append(layer_losses)
#             tqdm_loss.set_postfix({f"loss{idx} " : (sum(loss_list[idx])/len(loss_list[idx]))})
        
        # print(layer_losses, end='\r')
        # print(", ".join(map(lambda i, l: 'Layer {}: {}'.format(i, l),list(range(len(layer_losses))) ,layer_losses)), end='\r')


In [67]:
import tqdm

n_epochs = 60
train_loader_ff = torch.utils.data.DataLoader(TrainingDatasetFF(pos_gen_fn(X.to(device),Y.to(device), False) for X, Y in data_loader),
                                              batch_size=data_loader.batch_size, shuffle=True
                                              )
for idx in range(len(ffl_list)):
    tqdm_loss = tqdm.tqdm(range(n_epochs))
    for epoch in tqdm_loss:
        loss_list = [[] for _ in range(len(ffl_list))]
        
        for X_pos, Y_neg in train_loader_ff:
            X_pos_tmp, Y_neg_tmp = X_pos, Y_neg
            _, _, layer_losses = ffl_list[idx].train_layer(X_pos_tmp,Y_neg_tmp, before=False)
            tqdm_loss.set_postfix({f"loss{idx} " : {layer_losses}})
            
    traindatasetff  = TrainingDatasetFF(((ffl_list[idx](X_pos)), (ffl_list[idx](X_neg))) 
                                                                    for X_pos, X_neg in train_loader_ff)
    train_loader_ff = torch.utils.data.DataLoader(traindatasetff,batch_size=data_loader.batch_size, shuffle=True)
        
        # print(layer_losses, end='\r')
        # print(", ".join(map(lambda i, l: 'Layer {}: {}'.format(i, l),list(range(len(layer_losses))) ,layer_losses)), end='\r')


100%|██████████| 60/60 [00:31<00:00,  1.89it/s, loss0 ={0.3995414674282074}] 
100%|██████████| 60/60 [00:33<00:00,  1.77it/s, loss1 ={0.2839842438697815}] 
100%|██████████| 60/60 [00:33<00:00,  1.77it/s, loss2 ={0.26600155234336853}]
100%|██████████| 60/60 [00:33<00:00,  1.77it/s, loss3 ={0.3004131317138672}] 
100%|██████████| 60/60 [00:34<00:00,  1.76it/s, loss4 ={0.3065320551395416}] 


In [55]:
def predict_goodness(layers, X, pos_gen_fn, n_class):
    goodness_per_label = []
    for label in range(n_class):
        h = pos_gen_fn(X, label, True)
        goodness = []
        
        for layer in layers:
            h = layer(h)
            goodness += [h.pow(2).mean(1)]
        goodness_per_label += [sum(goodness).unsqueeze(1)]

    goodness_per_label = torch.cat(goodness_per_label, 1)

    return goodness_per_label.argmax(1)


In [68]:

layer_stack = []
for layer in ffl_list:
    acc = 0
    layer_stack.append(layer)
    for X_test, Y_test in tqdm.tqdm(test_loader, total=len(test_loader)):
        X_test = X_test.to(device)
        Y_test = Y_test.to(device)

        acc += (predict_goodness(layer_stack,X_test,
                pos_gen_fn, n_class=10).eq(Y_test).sum())

    print(f"Accuracy: {acc/float(len(test_loader.dataset)):.4%}")
    print(f"Test error: {1 - acc/float(len(test_loader.dataset)):.4%}")

100%|██████████| 10/10 [00:02<00:00,  3.81it/s]


Accuracy: 96.5400%
Test error: 3.4600%


100%|██████████| 10/10 [00:02<00:00,  3.34it/s]


Accuracy: 97.1000%
Test error: 2.9000%


100%|██████████| 10/10 [00:02<00:00,  3.75it/s]


Accuracy: 96.5600%
Test error: 3.4400%


100%|██████████| 10/10 [00:02<00:00,  3.69it/s]


Accuracy: 95.9600%
Test error: 4.0400%


100%|██████████| 10/10 [00:02<00:00,  3.37it/s]

Accuracy: 95.3200%
Test error: 4.6800%





In [None]:
len(test_loader.dataset)

10000

In [None]:
for X_test, Y_test in test_loader:
    pass

NameError: ignored