In [1]:
import numpy as np
import random
import math

import plotly
import plotly.graph_objects as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot

import matplotlib.pyplot as plt
import torch
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from tqdm.auto import tqdm, trange

from os import listdir, path
import os

import json

In [2]:
def configure_plotly_browser_state():
  import IPython
  display(IPython.core.display.HTML('''
        <script src="/static/components/requirejs/require.js"></script>
        <script>
          requirejs.config({
            paths: {
              base: '/static/base',
              plotly: 'https://cdn.plot.ly/plotly-latest.min.js?noext',
            },
          });
        </script>
        '''))

In [3]:
from google.colab import drive

drive.mount('/content/drive', force_remount=True)

FOLDERNAME = 'modelnet40_data'

%cd drive/My\ Drive
%cp -r $FOLDERNAME ../../
%cd ../../
%cd modelnet40_data/

Mounted at /content/drive
/content/drive/My Drive
/content
/content/modelnet40_data


In [4]:
train_data = np.load('modelnet40_train_data.npy')
test_data = np.load('modelnet40_test_data.npy')

train_labels = np.load('modelnet40_train_labels.npy').astype('int')
test_labels = np.load('modelnet40_test_labels.npy').astype('int')

train_data.shape, train_labels.shape, test_data.shape, test_labels.shape

((9840, 2048, 3), (9840,), (2468, 2048, 3), (2468,))

In [13]:
class PointSampler(object):
  def __init__(self, sample_num):
    self.sample_num = sample_num


  def __call__(self, points):
  
    new_points = points[np.random.choice(2048, self.sample_num, replace = False)]

    return new_points

class PointCut(object):
  def __init__(self, sample_num):
    self.sample_num = sample_num


  def __call__(self, points):
  
    new_points = points[:self.sample_num]

    return new_points
class Normalize(object):
  def __call__(self, verts):
    normalized_points = verts - np.mean(verts, axis=0)
    max_norm = np.max(np.linalg.norm(normalized_points, axis=1))

    normalized_points = normalized_points / max_norm

    return normalized_points

class RandomRotate(object):

  def __call__(self, verts):
    theta = 2 * np.random.uniform() * np.pi
    rotation_mat = np.array([[np.cos(theta), 0, np.sin(theta)], [0, 1, 0], [-np.sin(theta), 0, np.cos(theta)]])
    rotated = np.matmul(verts, rotation_mat)

    return rotated

 
class RandomNoise(object):

  def __call__(self, verts):
    noise = np.random.normal(0, 0.01, verts.shape)
    noise = np.clip(noise, -0.05, 0.05)
    return verts + noise

class toTensor(object):

  def __call__(self, verts):
     return torch.from_numpy(verts)


In [15]:
default_transform = transforms.Compose(
    [
      
       Normalize(),
       toTensor()]
)

train_transform = transforms.Compose(
      [
       PointSampler(1024),
       Normalize(),
       RandomNoise(),
       RandomRotate(),
       toTensor()]
)

In [16]:
class PointnetDataset(Dataset):
  def __init__(self, X, y, transform = train_transform):
    self.X = X
    self.y = y

    self.transform = transform
  def __len__(self):
    return len(self.y)


  def __getitem__(self, idx):
    x = self.X[idx]
    x = self.transform(x)
    y = torch.tensor(self.y[idx])

    return x, y


In [17]:
trainset = PointnetDataset(train_data, train_labels)
testset = PointnetDataset(test_data, test_labels, transform = default_transform)

trainloader = DataLoader(trainset, batch_size = 32, shuffle = True)
testloader = DataLoader(testset, batch_size = 32, shuffle = True)

In [18]:
if torch.cuda.is_available():
  device = torch.device('cuda:0')
  print('running on GPU')
else:
  device = torch.device('cpu')
  print('running on CPU')



running on GPU


In [19]:
class Tnet(nn.Module):

  def __init__(self, k):
    super().__init__()

    self.k = k

    self.mlp1 = nn.Conv1d(in_channels=k, out_channels=64, kernel_size=1)
    self.mlp2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=1)
    self.mlp3 = nn.Conv1d(in_channels=128, out_channels=1024, kernel_size=1)

    self.maxpool = nn.MaxPool1d(1024)

    self.fc1 = nn.Linear(in_features = 1024, out_features=512)
    self.fc2 = nn.Linear(in_features = 512, out_features=256)
    self.fc3 = nn.Linear(in_features = 256, out_features= k * k)

    self.bn1 = nn.BatchNorm1d(64)
    self.bn2 = nn.BatchNorm1d(128)
    self.bn3 = nn.BatchNorm1d(1024)
    self.bn4 = nn.BatchNorm1d(512)
    self.bn5 = nn.BatchNorm1d(256)

    self.relu = nn.ReLU()

  def forward(self, x):

    batch_size = x.shape[0]
    x = self.mlp1(x)
    x = self.bn1(x)
    x = self.relu(x)
    

    x = self.mlp2(x)
    x = self.bn2(x)
    x = self.relu(x)


    x = self.mlp3(x)
    x = self.bn3(x)
    x = self.relu(x)
     
    x = nn.MaxPool1d(x.shape[-1])(x)

    x = nn.Flatten()(x)


    x = self.fc1(x)
    x = self.bn4(x)
    x = self.relu(x)

    x = self.fc2(x)
    x = self.bn5(x)
    x = self.relu(x)

    x = self.fc3(x)

    init = torch.eye(self.k, requires_grad=True, device=device).repeat(batch_size, 1, 1)
    res = x.view((-1, self.k, self.k)) + init

    return res









In [20]:
class TransformNet(nn.Module):
  def __init__(self):
      super().__init__()
      self.Tnet3 = Tnet(3)
      self.Tnet64 = Tnet(64)

      self.mlp1 = nn.Conv1d(in_channels=3, out_channels = 64, kernel_size=1)

      self.mlp2 = nn.Conv1d(in_channels=64, out_channels = 128, kernel_size=1)

      self.mlp3 = nn.Conv1d(in_channels=128, out_channels = 1024, kernel_size=1)

      self.bn1 = nn.BatchNorm1d(64)
      self.bn2 = nn.BatchNorm1d(128)
      self.bn3 = nn.BatchNorm1d(1024)

      self.maxpool = nn.MaxPool1d(1024)
      self.relu = nn.ReLU()

  def forward(self, x):
    rot_mat3 = self.Tnet3(x)
    x = x.transpose(1, 2)

    x = torch.bmm(x, rot_mat3)

    x = x.transpose(1, 2)

    x = self.mlp1(x)
    x = self.bn1(x)
    x = self.relu(x)

    rot_mat64 = self.Tnet64(x)
    x = x.transpose(1, 2)

    x = torch.bmm(x, rot_mat64)

    x = x.transpose(1, 2)

    x = self.mlp2(x)
    x = self.bn2(x)
    x = self.relu(x)


    x = self.mlp3(x)
    x = self.bn3(x)

    x = nn.MaxPool1d(x.shape[-1])(x)

    x = nn.Flatten()(x)

    
    return x, rot_mat3, rot_mat64






In [21]:
class Pointnet(nn.Module):
  def __init__(self):
      super().__init__()
      self.transformnet = TransformNet()

      self.fc1 = nn.Linear(in_features=1024, out_features=512)
      self.fc2 = nn.Linear(in_features=512, out_features=256)
      self.fc3 = nn.Linear(in_features=256, out_features=40)

      self.bn1 = nn.BatchNorm1d(512)
      self.bn2 = nn.BatchNorm1d(256)

      self.relu = nn.ReLU()
  
  def forward(self, x):
    res, rot3, rot64 = self.transformnet(x)
    
    res = self.fc1(res)
    res = self.bn1(res)
    res = self.relu(res)

    res = F.dropout(res, p = 0.3, training = self.training)
        
    res = self.fc2(res)
    res = self.bn2(res)
    res = self.relu(res)


    res = F.dropout(res, p = 0.3, training = self.training)

    res = self.fc3(res)

    return res, rot3, rot64

      


In [22]:
def loss_function(output, true, rot64, alpha = 0.001):
  criterion = nn.CrossEntropyLoss()

  batch_size = output.shape[0]

  i64 = torch.eye(64, requires_grad=True, device=device).repeat(batch_size, 1, 1)

  mat64 = torch.bmm(rot64, rot64.transpose(1, 2))

  dif64 = nn.MSELoss(reduction='sum')(mat64, i64) / batch_size

  loss1 = criterion(output, true)
  loss2 = dif64
  loss = loss1 + alpha * loss2

  return loss, loss1, loss2

In [23]:
LOG_INTERVAL = 20
def train(model, optimizer, verbose = True):
  model.train()
  train_loss = 0
 

  for batch_idx, data in enumerate(tqdm(trainloader, desc='Batches', leave=False)):
    points, labels = data
    points = points.to(device)
    labels = labels.to(device)
    points = points.transpose(1,2).float()

    optimizer.zero_grad()

    o, rot3, rot64 = model(points)

    loss, ce, reg = loss_function(o, labels,  rot64)

    train_loss += loss.item()
    
    loss.backward()
    optimizer.step()
    if verbose and batch_idx % LOG_INTERVAL == LOG_INTERVAL-1:
          print('    Train [%d/%d]\t | \tLoss: %.5f, \tCross Entropy: %.5f, \tRegularization: %.5f' % (batch_idx * o.shape[0], len(trainloader.dataset), loss.item(), ce.item(), reg.item()))
  train_loss /= batch_idx
  if verbose:
      print('==> Train | Average loss: %.4f' % train_loss)
  return train_loss



In [24]:
def test(model, verbose = True):
   model.eval()
   test_loss = 0

   total = 0
   correct = 0
   with torch.no_grad():
     for i, data in enumerate(testloader):
          points, labels = data
          points = points.to(device)
          labels = labels.to(device)
          points = points.transpose(1,2).float()

          o, rot3, rot64 = model(points)

          _, predicted = torch.max(o.data, 1)

        

          total += labels.shape[0]

          correct += (labels == predicted).sum().item()

          loss, _, _ = loss_function(o, labels,  rot64)
          test_loss += loss.item() 

     test_loss /= i
     acc = 100 * (correct / total)
     if verbose:
        print('==> Test  | Average loss: %.4f' % test_loss)
        print('==> Test  | Accuracy: %.4f' % acc)
     return test_loss, acc


def test_acc(model, testdata,  verbose = True):
   model.eval()

   total = 0
   correct = 0
   predictions_all = torch.tensor([]).to(device)
   true_all = torch.tensor([]).to(device)
   with torch.no_grad():
     for i, data in enumerate(testdata):
          points, labels = data
          points = points.to(device)
          labels = labels.to(device)
          points = points.transpose(1,2).float()

          o, _, _ = model(points)
          _, predicted = torch.max(o.data, 1)
          
          total += labels.shape[0]
          predictions_all = torch.cat([predictions_all, predicted])
          true_all = torch.cat([true_all, labels])
          correct += (labels == predicted).sum().item()
     acc = 100 * (correct / total)
     if verbose:
        print('==> Test  | Accuracy: %.4f' % acc)
     return acc, predictions_all, true_all
         
         


In [25]:
def run(model, n_epoch, optimizer_state_dict = None, verbose = True):
  model.to(device)
  lr = 1e-3
  best_acc = 0
  optimizer = optim.Adam(model.parameters(), lr = lr)
  if optimizer_state_dict is not None:
    optimizer.load_state_dict(optimizer_state_dict)
  train_hist = []
  test_hist = []
  for epoch in trange(1, n_epoch+1, desc='Epochs', leave=True):
        if epoch % 20 == 19:
          lr = lr * 0.5
          #lr = max(lr, 1e-5)
          for param_group in optimizer.param_groups:
            param_group['lr'] = lr
        if verbose:
            print('\nEpoch %d:' % epoch)
            print(best_acc)
        train_loss = train(model, optimizer, verbose)
        test_loss, acc = test(model)

        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict()}, 'checkpoint.pt')
        train_hist.append(train_loss)
        test_hist.append(test_loss)
       
  return train_hist, test_hist

In [None]:
model = Pointnet().to(device)
n_epochs = 250

train_hist, test_hist = run(model, n_epochs)
torch.save({
            'model_state_dict': model.state_dict()}, 'model22.pt')


In [None]:
epoch_range = np.arange(250)
plt.plot(epoch_range, train_hist)
plt.plot(epoch_range, test_hist)