In [None]:
import torch 
import torch.nn as nn 
from torchvision.datasets import MNIST
from torchvision.transforms import Compose, ToTensor
from torch.optim import Adam, SGD
from torch.utils.data import DataLoader, Dataset
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 
from scipy.signal import convolve2d
from tqdm.notebook import tqdm_notebook
import random
from sklearn.preprocessing import OneHotEncoder
from torch.functional import F

In [None]:
batchsize = 1024
learning_rate = 0.03
learning_rate_lc = 0.05
epochs = 50
threshold = 2.0
image_shape = (1,28,28)

In [None]:
train_dataset = MNIST("./data/",download=True, train=True, transform=ToTensor())
test_dataset = MNIST("./data/",download=True, train=False, transform=ToTensor())

train_loader = DataLoader(train_dataset, batch_size=batchsize, shuffle=True, )
test_loader = DataLoader(test_dataset, batch_size=batchsize, shuffle=True)

In [None]:
def label_to_oh(y):
    y = y.numpy().reshape(-1,1)
    ohe = OneHotEncoder().fit(np.arange(10).reshape((10,1)))
    ohe_y = ohe.transform(y).toarray()
    return torch.Tensor(ohe_y)

In [None]:
def show_image(x):
    x = x.squeeze()
    plt.imshow(x, cmap="gray")
    plt.show()

In [None]:
# The method for generating masks for negative data mentioned by Geoffrey Hinton in the article
def mask_gen():
    random_iter = np.random.randint(5,10)
    random_image = np.random.randint(2, size=image_shape).squeeze().astype(np.float32)
    blur_filter = np.array([[1, 2, 1], [2, 4, 2], [1, 2, 1]]) / 16
    for i in range(random_iter):
        random_image = convolve2d(random_image, blur_filter, mode='same', boundary='symm')
    mask = (random_image > 0.5).astype(np.float32)
    return mask

# The method for creating masks for negative data that I tried for testing purposes.
def mask_gen1():
    n = image_1d_shape
    arr1 = np.random.normal(loc=0, scale=0.01, size=int(5*n/8))
    arr1 = arr1+ abs(0-arr1.min())
    arr2 = np.random.normal(loc=1, scale=0.01, size=int(3*n/8))
    arr2 = arr2 + abs(1-arr2.max())
    arr = np.concatenate([arr1,arr2])
    np.random.shuffle(arr)
    mask = arr.reshape(image_shape).astype(np.float32)
    return mask

show_image(mask_gen())

In [None]:
def negative_data_gen(batch):
    batch = batch[0]
    indexes = torch.randperm(batch.shape[0])
    x1 = batch
    x2 = batch[indexes]
    mask = mask_gen()
    merged_x1 = x1*mask
    merged_x2 = x2*(1-mask)
    hybrid_image = merged_x1+merged_x2
    return hybrid_image

x = next(iter(train_loader))
show_image(negative_data_gen(x)[0])

In [None]:
class FFConv2d(nn.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        super(FFConv2d, self).__init__(in_channels, out_channels, kernel_size, stride, padding)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.optimizer = Adam(self.parameters(), lr=learning_rate)
        self.threshold = threshold
        
    def forward(self, x):
        x_direction = x / (x.norm(2, 1, keepdim=True) + 1e-4)
        output = F.conv2d(x_direction, self.weight.cuda(), bias=self.bias.cuda(), stride=self.stride, padding=self.padding) 
        return F.relu(output)
    
    def train(self, x_pos, x_neg, epoch_num):
        for i in range(epoch_num):
            out_pos = self.forward(x_pos).pow(2).mean(1)
            out_neg = self.forward(x_neg).pow(2).mean(1)
            loss = torch.log(1+ torch.exp(torch.cat([threshold-out_pos,out_neg-threshold]))).mean()
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
        return self.forward(x_pos).detach(), self.forward(x_neg).detach()

In [None]:
class FFConvNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = FFConv2d(in_channels=1, out_channels=128, kernel_size=10, stride=6)
        self.conv2 = FFConv2d(in_channels=128, out_channels=220, kernel_size=3)
        self.conv3 = FFConv2d(in_channels=220, out_channels=512, kernel_size=2)
        self.dropout = nn.Dropout(0.5)
        
    def respresentation_vects(self,x):
        layers_output = torch.Tensor([]).cuda()
        layer1 = self.conv1(x)
        layer2 = self.conv2(layer1)
        layer3 = self.conv3(layer2)
        layers_output = torch.cat([layers_output, layer1.view(-1, np.prod(list(layer1.shape[1:]))),
                                                   layer2.view(-1, np.prod(list(layer2.shape[1:]))),
                                                   layer3.view(-1, np.prod(list(layer3.shape[1:])))],1)

        return layers_output
        
    def train(self, x_pos, x_neg, epoch_num):
        out_pos, out_neg = x_pos, x_neg
        out_pos, out_neg = self.conv1.train(out_pos, out_neg,epoch_num)
        out_pos, out_neg = self.conv2.train(out_pos, out_neg,epoch_num)
        out_pos, out_neg = self.conv3.train(out_pos, out_neg,epoch_num)

In [None]:
model = FFConvNet()

model_train_loop = tqdm_notebook(iter(train_loader),leave=True) 
batch = None
for batch in model_train_loop:
    x_pos = batch[0]
    x_neg = negative_data_gen(batch)
    x_pos, x_neg = x_pos.cuda(), x_neg.cuda()
    model.train(x_pos,x_neg,epochs)

In [None]:
x = batch[0].cuda()
rep_vects_shape = model.respresentation_vects(x).detach().shape[1]

In [None]:
class LinearClassification(nn.Module):
    def __init__(self, input_dimension):
        super().__init__()
        self.epoch_loss = []
        self.epoch_acc = []
        self.linear = torch.nn.Linear(input_dimension, 10).cuda()
        self.optimizer = SGD(self.parameters(), lr=learning_rate_lc)
        self.criterion = nn.CrossEntropyLoss()
        self.softmax = nn.Softmax()
        
    def forward(self,x):
        return self.linear(x)
    
    def predict(self,x):
        x = x.cuda()
        h_activity = model.respresentation_vects(x)
        y_h = self.forward(h_activity)
        soft_out = self.softmax(y_h)
        return soft_out.argmax(1)
    
    def accuracy_f(self, y_pred, y_true):
        batch_size = y_pred.size(0)
        _, predicted_labels = y_pred.max(1)
        y_pred_onehot = torch.zeros_like(y_pred)
        y_pred_onehot.scatter_(1, predicted_labels.view(-1,1), 1)
        correct = (y_pred_onehot == y_true).sum().item()
        accuracy = correct / (batch_size * y_true.size(1))
        return accuracy
        
        
    def train(self, data_loader,epoch_num):
        linear_loop = tqdm_notebook(range(epoch_num),total=epoch_num)
        for i in linear_loop:
            batch_loss = []
            batch_accuracy = []
            for batch in iter(data_loader):
                x,y = batch
                x = x.cuda()
                y_r = label_to_oh(y).cuda()
                h_activity = model.respresentation_vects(x)
                y_h = self.forward(h_activity)
                accuracy = self.accuracy_f(y_h,y_r)
                loss = self.criterion(y_h,y_r)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                batch_loss.append(loss)
                batch_accuracy.append(float(accuracy))
            self.epoch_acc.append(float(sum(batch_accuracy)/len(batch_accuracy)))
            self.epoch_loss.append(float(sum(batch_loss)/len(batch_loss)))
            linear_loop.set_description(f"Epoch [{i+1}/{epoch_num}]: ")
            linear_loop.set_postfix(loss=self.epoch_loss[i],accuracy=self.epoch_acc[i])    
            
    def test(self, data_loader):
        batch_loss = []
        batch_accuracy = []
        test_loss = 0
        for batch in iter(data_loader):
            x,y = batch
            x = x.cuda()
            y_r = label_to_oh(y).cuda()
            h_activity = model.respresentation_vects(x)
            y_h = self.forward(h_activity)
            accuracy = self.accuracy_f(y_h,y_r)
            loss = self.criterion(y_h,y_r)
            batch_loss.append(loss)
            batch_accuracy.append(float(accuracy))
        test_loss = float(sum(batch_loss)/len(batch_loss))
        test_accuracy = float(sum(batch_accuracy)/len(batch_accuracy))
        return test_loss,test_accuracy
        
        

In [None]:
linear_model = LinearClassification(rep_vects_shape)
linear_model.train(train_loader,epochs)

In [None]:
test_loss,test_acc = linear_model.test(test_loader)
print("Test Loss: ",test_loss)
print("Test Accuracy: ",test_acc)

In [None]:
plt.figure(figsize=(10,5))

plt.subplot(1, 2, 1)
plt.plot(range(1, epochs+1), linear_model.epoch_acc, 'b-', label='Train Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Train Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(range(1, epochs+1), linear_model.epoch_loss, 'r-', label='Train Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Train Loss')
plt.legend()

plt.show()

In [None]:
iterator = iter(test_loader)
n = 4
fig, ax1 = plt.subplots(1, n)
a = 0
for i in range(n):
    x = next(iterator)[0]
    num=int(linear_model.predict(x)[0])
    ax1[i].imshow(x[0].squeeze(),cmap="gray")
    ax1[i].set_title(str(num))
plt.show()