In [1]:
import sys
sys.path.append("../src")
import torch
import matplotlib.pyplot as plt
import numpy as np
import torchvision
import torch.nn.functional as F

import glob
import os
from datetime import datetime
import time
import math
from tqdm import tqdm

from itertools import repeat
from torch.nn.parameter import Parameter
import collections
import matplotlib
from torch_utils import *
from PC import *
from visualization import *
# matplotlib.use('Agg')

In [2]:
def evaluatePC(model, loader, neural_lr_start, neural_lr_stop, neural_lr_rule, 
                         neural_lr_decay_multiplier,
                         neural_dynamic_iterations, device, printing = True):
    # Evaluate the model on a dataloader with T steps for the dynamics
    #model.eval()
    correct=0
    phase = 'Train' if loader.dataset.train else 'Test'
    
    for x, y in loader:
        x = activation_inverse(2*x.view(x.size(0),-1).to(device).T - 1, "sigmoid")
#         x = activation_inverse(x.view(x.size(0),-1).T, "sigmoid").to(device)
        y = y.to(device)
        
        neurons = model.fast_forward(x)
        
#         # dynamics for T time steps
#         neurons = model.run_neural_dynamics(x, y_one_hot, neurons, neural_lr_start, neural_lr_stop, 
#                                             neural_lr_rule,
#                                             neural_lr_decay_multiplier, neural_dynamic_iterations, 0, "test")
        pred = torch.argmax(neurons[-1], dim=0).squeeze()  # in this case prediction is done directly on the last (output) layer of neurons
        correct += (y == pred).sum().item()

    acc = correct/len(loader.dataset) 
    if printing:
        print(phase+' accuracy :\t', acc)   
    return acc

In [3]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda', index=0)

In [4]:
transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor(), 
                                            torchvision.transforms.Normalize(mean=(0.0,), std=(1.0,))])

mnist_dset_train = torchvision.datasets.MNIST('./data', train=True, transform=transform, target_transform=None, download=True)
train_loader = torch.utils.data.DataLoader(mnist_dset_train, batch_size=20, shuffle=True, num_workers=0)

mnist_dset_test = torchvision.datasets.MNIST('./data', train=False, transform=transform, target_transform=None, download=True)
test_loader = torch.utils.data.DataLoader(mnist_dset_test, batch_size=20, shuffle=False, num_workers=0)

In [5]:
activation_type = "sigmoid"
architecture = [784, 500, 500, 10]

x,y = next(iter(train_loader))
x = x.view(x.size(0),-1).to(device).T
y_one_hot = F.one_hot(y, 10).to(device).T

neural_lr_start = 0.1
neural_lr_stop = 1e-3
neural_lr_rule = "constant"
neural_lr_decay_multiplier = 0.1
neural_dynamic_iterations = 50

lr_start = {'ff' : 0.001}

model = SupervisedPredictiveCoding(architecture, activation_type)

In [None]:
trn_acc_list = []
tst_acc_list = []

n_epochs = 50
lr = lr_start
for epoch_ in range(n_epochs):
#     lr = {'ff' : lr_start['ff'] * (0.999)**epoch_}
    for idx, (x, y) in tqdm(enumerate(train_loader)):
        x, y = x.to(device), y.to(device)
        x = activation_inverse(2*x.view(x.size(0),-1).T - 1, "sigmoid")
#         x = activation_inverse(x.view(x.size(0),-1).T, "sigmoid")
        y_one_hot = F.one_hot(y, 10).to(device).T
        y_one_hot = 0.94 * y_one_hot + 0.03 * torch.ones(*y_one_hot.shape, device = device)
        _ = model.batch_step(  x, y_one_hot, lr, neural_lr_start, neural_lr_stop, neural_lr_rule,
                               neural_lr_decay_multiplier, neural_dynamic_iterations,
                               optimizer = "adam")

    trn_acc = evaluatePC(  model, train_loader, neural_lr_start, neural_lr_stop, neural_lr_rule, 
                           neural_lr_decay_multiplier,
                           neural_dynamic_iterations, device, printing = False)
    tst_acc = evaluatePC(  model, test_loader, neural_lr_start, neural_lr_stop, neural_lr_rule, 
                           neural_lr_decay_multiplier,
                           neural_dynamic_iterations, device, printing = False)
    trn_acc_list.append(trn_acc)
    tst_acc_list.append(tst_acc)
    
    print("Epoch : {}, Train Accuracy : {}, Test Accuracy : {}".format(epoch_+1, trn_acc, tst_acc))

3000it [01:40, 29.87it/s]
2it [00:00, 19.54it/s]

Epoch : 1, Train Accuracy : 0.8896333333333334, Test Accuracy : 0.8929


3000it [02:16, 21.90it/s]
3it [00:00, 26.50it/s]

Epoch : 2, Train Accuracy : 0.9159666666666667, Test Accuracy : 0.9163


3000it [01:41, 29.60it/s]
2it [00:00, 11.96it/s]

Epoch : 3, Train Accuracy : 0.93515, Test Accuracy : 0.9335


3000it [02:32, 19.62it/s]
3it [00:00, 27.21it/s]

Epoch : 4, Train Accuracy : 0.9359333333333333, Test Accuracy : 0.934


3000it [01:43, 29.10it/s]
2it [00:00, 14.71it/s]

Epoch : 5, Train Accuracy : 0.9476333333333333, Test Accuracy : 0.9455


3000it [02:20, 21.35it/s]
3it [00:00, 27.07it/s]

Epoch : 6, Train Accuracy : 0.9464333333333333, Test Accuracy : 0.9447


3000it [01:54, 26.12it/s]
3it [00:00, 26.74it/s]

Epoch : 7, Train Accuracy : 0.9543833333333334, Test Accuracy : 0.9519


3000it [01:55, 25.90it/s]
2it [00:00, 14.37it/s]

Epoch : 8, Train Accuracy : 0.96235, Test Accuracy : 0.9601


3000it [03:23, 14.71it/s]
2it [00:00, 14.11it/s]

Epoch : 9, Train Accuracy : 0.9561833333333334, Test Accuracy : 0.9525


1568it [01:46, 14.35it/s]

In [None]:
neurons = model.fast_forward(activation_func(x, "sigmoid")[0])
neurons[-1] = y_one_hot.to(torch.float)
neurons = model.run_neural_dynamics(x, y, neurons, neural_lr_start, neural_lr_stop, neural_lr_rule, 
                          neural_lr_decay_multiplier, neural_dynamic_iterations)

In [None]:
tst_acc = evaluatePC(  model, test_loader, neural_lr_start, neural_lr_stop, neural_lr_rule, 
                                 neural_lr_decay_multiplier,
                                 neural_dynamic_iterations, device, printing = True)

In [None]:
Wff = model.Wff
Wff[0]

In [None]:
model.batch_step(x, y_one_hot, lr_start, neural_lr_start, neural_lr_stop, neural_lr_rule, 
                          neural_lr_decay_multiplier, neural_dynamic_iterations)

In [None]:
model.Wff

In [None]:
evalua

In [None]:
neurons = model.fast_forward(activation_func(x, "sigmoid")[0])

In [None]:
layers_after_activation, error_layers, grads = model.calculate_neural_dynamics_grad(x, y, neurons)

In [None]:
grads

In [None]:
len(layers_after_activation), len(error_layers), len(grads)

In [None]:
layers_after_activation[jj + 1][1].shape

In [None]:
jj = 0
error_layers[jj + 1].shape, model.Wff[jj + 1]['weight'].shape

In [None]:
len(layers_after_activation)

In [None]:
jj = 0
Wff[jj]['weight'] @ layers_after_activation[jj][0]

In [None]:
variances

In [None]:
Wff = model.Wff
variances = model.variances
layers = [x] + neurons

[(layers[jj+1] - (Wff[jj]['weight'] @ layers_after_activation[jj][0] + Wff[jj]['bias'])) / variances[jj + 1] for jj in range(len(layers) - 1)]

In [None]:
def activation_func(x, type_ = "linear"):
    if type_ == "linear":
        f_x = x
        fp_x = torch.ones(*x.shape, device = x.device)
    elif type_ == "tanh":
        f_x = torch.tanh(x)
        fp_x = torch.ones(*x.shape, device = x.device) - f_x ** 2
    elif type_ == "sigmoid":
        ones_vec = torch.ones(*x.shape, device = x.device)
        f_x = 1 / (ones_vec + torch.exp(-x))
        fp_x = f_x * (ones_vec - f_x)
    elif type_ == "relu":
        f_x = torch.maximum(x, torch.tensor([0], device = x.device))
        fp_x = 1 * (x > 0)
    elif type_ == "exp":
        f_x = torch.exp(x)
        fp_x = f_x
    else: # Use linear
        f_x = x
        fp_x = torch.ones(*x.shape, device = x.device)
        
    return f_x, fp_x

In [None]:
x = torch.randn(3,1, device = "cuda")
x

In [None]:
x = torch.randn(3,1, device = "cuda")
print(x)
activation_func(x, type_ = "sigmoid")

In [None]:
device = "cuda"

In [None]:
(2 * torch.rand(3, 3, requires_grad = False).to(device) - 1) * (4 * np.sqrt(6 / (3 + 3)))