In [1]:
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from random import shuffle
import math
import os

In [2]:
# Mount GoogleDrive:

from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


#1. Load images

In [3]:
transform = transforms.Compose([transforms.Resize((128,128)), transforms.ToTensor()])

# change the path based on where you put the image folder
rootpath = "/content/gdrive/My Drive/Testing cokes/no_bgr"
dataset = torchvision.datasets.ImageFolder(root=rootpath, transform=transform)
print("Total number of images", len(dataset))

Total number of images 13


In [4]:
indices = []
for i in range (len(dataset)):
  indices.append(i)

np.random.seed(1000) # Fixed numpy random seed for reproducible shuffling
np.random.shuffle(indices)

test_sampler = SubsetRandomSampler(indices)
test_loader = torch.utils.data.DataLoader(dataset, batch_size=1, num_workers=1, sampler=test_sampler)

# 2. Load model

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable


def squash(inputs, axis=-1):
    """
    The primary capsule will use this function to squash all the vectors
    Meaning the magnitudes will be normalized while the the directions are preserved
    """
    norm = torch.norm(inputs, p=2, dim=axis, keepdim=True)
    scale = norm**2 / (1 + norm**2) / (norm + 1e-8)
    return scale * inputs

def caps_loss(y_true, y_pred, x, x_recon, lam_recon):

    L = y_true * torch.clamp(0.9 - y_pred, min=0.) ** 2 + \
        0.5 * (1 - y_true) * torch.clamp(y_pred - 0.1, min=0.) ** 2
    L_margin = L.sum(dim=1).mean()

    L_recon = nn.MSELoss()(x_recon, x)

    return L_margin + lam_recon * L_recon
    
class DenseCapsule(nn.Module):

    def __init__(self, in_num_caps, in_dim_caps, out_num_caps, out_dim_caps, routings=3):
        super(DenseCapsule, self).__init__()
        self.in_num_caps = in_num_caps
        self.in_dim_caps = in_dim_caps
        self.out_num_caps = out_num_caps
        self.out_dim_caps = out_dim_caps
        self.routings = routings
        self.weight = nn.Parameter(0.01 * torch.randn(out_num_caps, in_num_caps, out_dim_caps, in_dim_caps))

    def forward(self, x):
        # x.size=[batch, in_num_caps, in_dim_caps]
        # expanded to    [batch, 1,            in_num_caps, in_dim_caps,  1]
        # weight.size   =[       out_num_caps, in_num_caps, out_dim_caps, in_dim_caps]
        # torch.matmul: [out_dim_caps, in_dim_caps] x [in_dim_caps, 1] -> [out_dim_caps, 1]
        # => x_hat.size =[batch, out_num_caps, in_num_caps, out_dim_caps]
        
        compute = torch.matmul(self.weight, x[:, None, :, :, None])
        x_hat = torch.squeeze(compute, dim=-1)

        # In forward pass, `x_hat_detached` = `x_hat`;
        # In backward, no gradient can flow from `x_hat_detached` back to `x_hat`.
        x_hat_detached = x_hat.detach()

        # The prior for coupling coefficient, initialized as zeros.
        # b.size = [batch, out_num_caps, in_num_caps]
        b = Variable(torch.zeros(x.size(0), self.out_num_caps, self.in_num_caps)).cuda()

        assert self.routings > 0, 'The \'routings\' should be > 0.'
        for i in range(self.routings):
            # c.size = [batch, out_num_caps, in_num_caps]
            c = F.softmax(b, dim=1)

            # At last iteration, use `x_hat` to compute `outputs` in order to backpropagate gradient
            if i == self.routings - 1:
                # c.size expanded to [batch, out_num_caps, in_num_caps, 1           ]
                # x_hat.size     =   [batch, out_num_caps, in_num_caps, out_dim_caps]
                # => outputs.size=   [batch, out_num_caps, 1,           out_dim_caps]
                outputs = squash(torch.sum(c[:, :, :, None] * x_hat, dim=-2, keepdim=True))
                # outputs = squash(torch.matmul(c[:, :, None, :], x_hat))  # alternative way
            else:  # Otherwise, use `x_hat_detached` to update `b`. No gradients flow on this path.
                outputs = squash(torch.sum(c[:, :, :, None] * x_hat_detached, dim=-2, keepdim=True))
                # outputs = squash(torch.matmul(c[:, :, None, :], x_hat_detached))  # alternative way

                # outputs.size       =[batch, out_num_caps, 1,           out_dim_caps]
                # x_hat_detached.size=[batch, out_num_caps, in_num_caps, out_dim_caps]
                # => b.size          =[batch, out_num_caps, in_num_caps]
                b = b + torch.sum(outputs * x_hat_detached, dim=-1)

        return torch.squeeze(outputs, dim=-2)


class PrimaryCapsule(nn.Module):
    """
    convert the feature maps from CNN to vectors and squash the vectors
    """
    def __init__(self, in_channels, out_channels, dim_caps, kernel_size, stride=1, padding=0):
        super(PrimaryCapsule, self).__init__()
        self.dim_caps = dim_caps
        self.conv2d = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)

    def forward(self, x):
        outputs = self.conv2d(x)
        outputs = outputs.view(x.size(0), -1, self.dim_caps)
        return squash(outputs)


In [7]:
class CapsuleNet(nn.Module):

    def __init__(self, input_size, classes, routings):
        super(CapsuleNet, self).__init__()
        self.input_size = input_size
        self.classes = classes
        self.routings = routings

        # Layer 1: Just a conventional Conv2D layer
        self.conv1 = nn.Conv2d(input_size[0], 16, kernel_size=3, stride=1, padding=0)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=0)

        self.pool = nn.MaxPool2d(2, 2) 

        # Layer 2: Conv2D layer with `squash` activation, then reshape to [None, num_caps, dim_caps]
        self.primarycaps = PrimaryCapsule(32, 32, 8, kernel_size=5, stride=2, padding=0)

        # Layer 3: Capsule layer. Routing algorithm works here.
        self.digitcaps = DenseCapsule(in_num_caps=676, in_dim_caps=8,
                                      out_num_caps=classes, out_dim_caps=16, routings=routings)

        # Decoder network.
        self.decoder = nn.Sequential(
            nn.Linear(16*classes, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, input_size[0] * input_size[1] * input_size[2]),
            nn.Sigmoid()
        )

        self.relu = nn.ReLU()

    def forward(self, x, y=None):
        x = self.pool(self.conv1(x))
        x = self.pool(self.conv2(x))
        x = self.primarycaps(x)
        x = self.digitcaps(x)
        length = x.norm(dim=-1)
        if y is None:  # during testing, no label given. create one-hot coding using `length`
            index = length.max(dim=1)[1]
            y = Variable(torch.zeros(length.size()).scatter_(1, index.view(-1, 1).cpu().data, 1.).cuda())
        reconstruction = self.decoder((x * y[:, :, None]).view(x.size(0), -1))
        return length, reconstruction.view(-1, *self.input_size)

In [9]:
def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    for x, y in test_loader:
        y = torch.zeros(y.size(0), 10).scatter_(1, y.view(-1, 1), 1.)
        x, y = Variable(x.cuda(), volatile=True), Variable(y.cuda())
        y_pred, x_recon = model(x)
        test_loss += caps_loss(y, y_pred, x, x_recon, 0.0005 * 784).item()  # sum up batch loss
        y_pred = y_pred.data.max(1)[1]
        y_true = y.data.max(1)[1]
        correct += y_pred.eq(y_true).sum().item()
        total += x.shape[0]
    test_loss = test_loss / len(test_loader)
    return test_loss, correct/total


def train(model, train_loader, test_loader, epoch, learning_rate):

    print('Begin Training' + '-'*70)

        # starts timer
    start_time = time.time()

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    lr_decay = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
    train_accs, val_accs, train_losses, val_losses, iters = [], [], [], [], []    
    n=0

    for epoch in range(epoch):
        model.train()  # set to training mode
        lr_decay.step()  # decrease the learning rate by multiplying a factor `gamma`
        total_train_loss = 0.0
        j = 0
        for i, (x, y) in enumerate(train_loader):  # batch training
            y = torch.zeros(y.size(0), 10).scatter_(1, y.view(-1, 1), 1.)  # change to one-hot coding
            x, y = Variable(x.cuda()), Variable(y.cuda())  # convert input data to GPU Variable

            optimizer.zero_grad()  # set gradients of optimizer to zero
            y_pred, x_recon = model(x, y)  # forward
            loss = caps_loss(y, y_pred, x, x_recon, 0.0005 * 784)  # compute loss
            loss.backward()  # backward, compute all gradients of loss w.r.t all Variables
            total_train_loss += loss.item()  # record the batch loss
            optimizer.step()  # update the trainable parameters with computed gradients
            j+=1

        # compute validation loss and acc
        val_loss, val_acc = test(model, val_loader)
        train_loss, train_acc = test(model, train_loader)

        # track accuracy
        train_accs.append((train_acc))
        val_accs.append((val_acc))
        train_losses.append((total_train_loss) / (j+1))
        #train_loss.append(loss)
        val_losses.append(val_loss)
        iters.append(n)
        n += 1
        print("epoch:", n, "train_acc:", train_accs[-1], "val_acc:", val_accs[-1], "train_loss:", train_losses[-1], "val_loss:", val_losses[-1])
    
        end_time = time.time()
        elapsed_time = end_time - start_time
        print("Total time elapsed: {:.2f} seconds".format(elapsed_time))

        plt.title("Train vs. Validation Accuracy")
        plt.plot(iters, train_accs, label="Train")
        plt.plot(iters, val_accs, label="Validation")
        plt.xlabel("Epochs")
        plt.ylabel("Training Accuracy")
        plt.legend(loc='best')
        plt.show()

        plt.title("Train vs. Validation Loss")
        plt.plot(iters, train_losses, label="Train")
        plt.plot(iters, val_losses, label="Validation")
        plt.xlabel("Epochs")
        plt.ylabel("Training Loss")
        plt.legend(loc='best')
        plt.show()

    print('Finished Training')


In [14]:
# make sure the this pickle file is in your working directory 

import pickle
pklfile = open('primary_model.pkl', 'rb')
mymodel = pickle.load(pklfile)
pklfile.close

<function BufferedReader.close>

# 3. Visualize the result

In [20]:
# this will get the loss and accuracy for the dataset 

print("loss:", test(mymodel, test_loader)[0])
print("accuracy:", test(mymodel, test_loader)[1])

  


loss: 0.38018367840693545
accuracy: 0.5384615384615384


In [21]:
# view the true and prediction labels

mymodel.train()
pred = []
label = []
for i, (x, y) in enumerate(test_loader):  # batch training
    y = torch.zeros(y.size(0), 10).scatter_(1, y.view(-1, 1), 1.)  # change to one-hot coding
    x, y = Variable(x.cuda()), Variable(y.cuda())  # convert input data to GPU Variable
    y_pred, x_recon = mymodel(x, y)  # forward
    t_values, t_indice = torch.max(y[0], 0)
    values, indice = torch.max(y_pred[0], 0)
    a=indice.cpu().detach().numpy()
    b=t_indice.cpu().detach().numpy()
    label.append(int(b))
    pred.append(int(a))

classes = ["socket", "remote control", "cellphone", "scissors", "bulb", "coke", "sunglasses", "ball", "highlighter", "cup"]

for i in range(0, 13):
  print("image", i+1)
  print("true label", classes[label[i]])
  print("prediction", classes[pred[i]])
  print("")

image 1
true label coke
prediction highlighter

image 2
true label coke
prediction ball

image 3
true label coke
prediction coke

image 4
true label coke
prediction highlighter

image 5
true label coke
prediction coke

image 6
true label coke
prediction coke

image 7
true label coke
prediction coke

image 8
true label coke
prediction coke

image 9
true label coke
prediction coke

image 10
true label coke
prediction ball

image 11
true label coke
prediction ball

image 12
true label coke
prediction coke

image 13
true label coke
prediction socket

