##  Complex representation model with ModelNet10 dataset and VoxNet architecture

In [0]:
from __future__ import print_function, division
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from torchvision import transforms, utils
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from scipy.ndimage import rotate
import torch.nn as nn
import torch.backends.cudnn as cudnn
from torch.autograd import Variable
from collections import OrderedDict
import imp
import time
import os
import sys
import importlib
import argparse

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

In [2]:
!pip install mahotas
import mahotas



In [0]:
class ModelNetDataset(Dataset):
    """ModelNet10 dataset."""

    def __init__(self, data_file, label_file, transform=None):
      """
      Args:
          data_file (string): Path to the npz file with annotations.
          transform (callable, optional): Optional transform to be applied
              on a sample.
      """
      self.classes = classes
      self.transform = transform

      data = np.load(data_file)
      labels = np.load(label_file)
      self.X, self.Y = shuffle(data, labels)
      self.nsamples = len(self.X)   

    def __len__(self):
      return self.nsamples

    def __getitem__(self, idx):
      x = self.X[idx]
      y = self.Y[idx]
      if self.transform:
          x, y = self.transform((x,y))
      return (x, y)

    def show_voxelgrid(self, sample):
      """Show 3D voxel"""
      X, Y = sample
      ax.voxels(X, edgecolor="k")
      # plt.pause(1)

In [0]:
class toComplexRepr(object):
  def __init__(self):
    self.MAX_DISTANCE = float("inf")

  def normalize(self, X):
    if(np.min(X)<np.max(X)):
       X = (X - np.min(X)) / (np.max(X) - np.min(X))
    return X
  
  def __call__(self, sample):
    X, label = sample
    dmap = mahotas.distance(1-X)    
    dmap = self.normalize(dmap)
    if np.isnan(dmap).any() == True:
      print('NAN detected..............')
    return (dmap, label)


In [0]:
classes = ['bathtub', 'bed', 'chair', 'desk', 'dresser', 'monitor', 'night_stand', 'sofa', 'table', 'toilet']
modelnet_dataset = ModelNetDataset(data_file='drive/My Drive/Dataset/X_train.npy',
                                    label_file='drive/My Drive/Dataset/y_train.npy')

fig = plt.figure(figsize=(17, 7))

for i in range(len(modelnet_dataset)):
  sample = modelnet_dataset[i]

  print(i, sample[0].shape, sample[1])

  ax = fig.add_subplot(1, 4, i + 1, projection='3d')
  ax.set_title('Sample #{0}, GT: {1}'.format(i, classes[sample[1]]))
  # ax.axis('off')
  modelnet_dataset.show_voxelgrid(sample)

  if i == 3:
      plt.show()
      break

0 (32, 32, 32) 5
1 (32, 32, 32) 7
2 (32, 32, 32) 0
3 (32, 32, 32) 9


In [0]:
class VoxNet(torch.nn.Module):   

    def __init__(self, num_classes=10, input_shape=(32,32,32)):

        super(VoxNet, self).__init__()
        self.input_shape = input_shape
        self.cnn_layers = torch.nn.Sequential(OrderedDict([
            ('conv1', torch.nn.Conv3d(in_channels=1, out_channels=32, kernel_size=5, stride=2)),
            ('relu1', torch.nn.LeakyReLU(0.1)),
            ('drop1', torch.nn.Dropout(p=0.2)),
            ('conv2', torch.nn.Conv3d(in_channels=32, out_channels=32, kernel_size=3)),
            ('relu2', torch.nn.LeakyReLU(0.1)),
            ('pool2', torch.nn.MaxPool3d(2)),
            ('drop2', torch.nn.Dropout(p=0.4))
        ]))

        x = self.cnn_layers(torch.autograd.Variable(torch.rand((1, 1) + self.input_shape)))
        fc1_in = 1
        for n in x.size()[1:]:
            fc1_in *= n

        self.linear_layers = torch.nn.Sequential(OrderedDict([
            ('fc1', torch.nn.Linear(fc1_in, 128)),
            ('relu1', torch.nn.LeakyReLU(0.1)),
            ('drop3', torch.nn.Dropout(p=0.4)),
            ('fc2', torch.nn.Linear(128, num_classes))
        ]))

    # Defining the forward pass    
    def forward(self, x):
        x = self.cnn_layers(x)
        x = x.view(x.size(0), -1)
        x = self.linear_layers(x)

        return x       

In [0]:
def train_model(loader, model, criterion, optimizer, device):
    model.train()
    num_batch = len(loader)
    batch_size = loader.batch_size
    total = torch.FloatTensor([0])
    correct = torch.FloatTensor([0])
    total_loss = 0.
    n = 0

    for i, (inputs, targets) in enumerate(loader):
        #inputs = torch.from_numpy(inputs)
        # inputs = inputs.type(torch.DoubleTensor)
        # targets = targets.type(torch.DoubleTensor)
        inputs, targets = inputs.to(device), targets.to(device)
        inputs = inputs.reshape((-1, 1, 32, 32, 32))
        targets = targets.reshape(-1)
        inputs, targets = shuffle(inputs, targets)
        # print(inputs)
        #in 0.4.0 variable and tensor are merged
        #inputs, targets = Variable(inputs), Variable(targets)

       
        optimizer.zero_grad()
        # compute output
        outputs = model(inputs.float())
        loss = criterion(outputs, targets.long())
        
        # loss = F.nll_loss(outputs, targets.long())
        
        total_loss += loss.item()
        tl = loss.item()
        n += 1
        _, predicted = torch.max(outputs.detach(), 1)
        # predicted = outputs.max(1, keepdim=True)[1]
        total += targets.size(0)
        correct += (predicted == targets).cpu().sum()

         # compute gradient and do SGD step
        
        loss.backward()
        optimizer.step()

        log_iter = 100
        if (i + 1) % log_iter == 0:
            print("\tIter [%d/%d] Loss: %.4f" % (i + 1, num_batch, tl/log_iter))
            tl = 0.
    train_acc = 100.0 * correct.item() / total.item()
    train_loss = total_loss / n
    print("Train Accuracy %.2f" % (train_acc))
    return train_acc, train_loss


def test_model(loader, model, criterion, optimizer, device):

    total = torch.LongTensor([0])
    correct = torch.LongTensor([0])

    total_loss = 0.0
    n = 0

    for i, (inputs, targets) in enumerate(loader):
        with torch.no_grad():

            inputs, targets = inputs.to(device), targets.to(device)
            inputs = inputs.reshape((-1, 1, 32, 32, 32))
            targets = targets.reshape(-1)           

            # compute output
            outputs = model(inputs.float())
            # loss = F.nll_loss(outputs, targets.long())
            loss = criterion(outputs, targets.long())
          
            total_loss += loss.item()
            n += 1

            _, predicted = torch.max(outputs.detach(), 1)
            # predicted = outputs.max(1, keepdim=True)[1]
            total += targets.size(0)
            correct += (predicted == targets).cpu().sum()

    avg_test_acc = 100. * correct.item() / total.item()
    avg_loss = total_loss / n

    return avg_test_acc, avg_loss

In [0]:
# load network
print("loading module")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = VoxNet()
model.to(device)

In [0]:
classes = ['bathtub', 'bed', 'chair', 'desk', 'dresser', 'monitor', 'night_stand', 'sofa', 'table', 'toilet']
train_data = ModelNetDataset(data_file='drive/My Drive/Dataset/X_train.npy',
                                    label_file='drive/My Drive/Dataset/y_train.npy',
                                          transform=transforms.Compose([
                                               toComplexRepr()
                                           ])
                                           )

test_data1 = ModelNetDataset(data_file='drive/My Drive/Dataset/X_test.npy',
                                    label_file='drive/My Drive/Dataset/y_test.npy',
                                          transform=transforms.Compose([
                                               toComplexRepr()
                                           ])
                                           ) 

# train_loader = DataLoader(train_data, batch_size=8, shuffle=True, num_workers=4)
train_loader = DataLoader(train_data, batch_size=1, shuffle=True, num_workers=2)
test_loader1 = DataLoader(test_data1, batch_size=8, shuffle=True, num_workers=2)

In [0]:
# train_batch, labels = next(iter(train_loader))
# print(train_batch.shape)

# test_batch1, labels1 = next(iter(test_loader1))
# print(test_batch1.shape)

In [0]:
# labels[0]

In [0]:
start_epoch = 0
best_acc = 0.
n_iters = 10
train_acc=[]
train_loss=[]
test1_acc=[]
test1_loss=[]
# test2_acc=[]
# test2_loss=[]

# set optimization methods
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 4, 0.8)

for epoch in range(start_epoch, n_iters):
    
    print('Epoch: [%d/%d]' % (epoch+1, n_iters))
    start = time.time()

    # model.train()
    avg_train_acc, avg_loss = train_model(train_loader, model, criterion, optimizer, device)
    train_acc.append(avg_train_acc)
    train_loss.append(avg_loss)
    print("Train Accuracy:", avg_train_acc)
    print('Time taken: %.2f sec.' % (time.time() - start))
    # scheduler.step()

    model.eval()
    avg_test1_acc, avg_loss1 = test_model(test_loader1, model, criterion, optimizer, device)
    test1_acc.append(avg_test1_acc)
    test1_loss.append(avg_loss1)
    print("Test Accuracy on Test1:", avg_test1_acc)

    # avg_test2_acc, avg_loss2 = test_model(test_loader2, model, criterion, optimizer, device)
    # test2_acc.append(avg_test2_acc)
    # test2_loss.append(avg_loss2)
    # print("Test Accuracy on Test2:", avg_test2_acc)


In [0]:
plt.figure(figsize=(10,10))
plt.subplot(2, 1, 1)
plt.plot(range(1, len(train_acc) + 1),train_acc, label='Train Accuracy')
#plt.plot(range(1, len(test1_acc) + 1),test1_acc, label='Test1 Accuracy') 
plt.plot(range(1, len(test2_acc) + 1),test2_acc, label='Test2 Accuracy') 
plt.xlabel('Epochs')
plt.ylabel('Average Accuracy') 
plt.legend(loc='best')
plt.title('Train/Validation Accuracy vs Epochs for Voxnet') 
plt.subplot(2, 1, 2)
plt.plot(range(1, len(train_loss) + 1),train_loss, label='Train Loss')
#plt.plot(range(1, len(test1_loss) + 1),test1_loss, label='Test1 Loss')
plt.plot(range(1, len(test2_loss) + 1),test2_loss, label='Test2 Loss')
plt.xlabel('Epochs')
plt.ylabel('Average Loss')
plt.legend(loc='best')
plt.title('Training/Validation Loss vs Epochs for Voxnet')
plt.tight_layout()
plt.show()