In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
import open3d as o3d
from pathlib import Path
import zipfile
import requests
import os
import numpy as np
from contextlib import contextmanager

In [2]:
if not Path("ModelNet10").exists():
    req = requests.get(r"http://3dvision.princeton.edu/projects/2014/3DShapeNets/ModelNet10.zip")
    
    path = "ModelNet10.zip"

    with open(path, "wb") as f:
        f.write(req.content)

    with zipfile.ZipFile(path, "r") as f:
        f.extractall()

    os.remove(path)

## Dataloaders

In [3]:
class ModelNetDataSet(Dataset):
  def __init__(self, path: Path, point_range=(1000, 25_000), test=False):
    super(ModelNetDataSet, self).__init__()
    if isinstance(path, str):
      path = Path(path)

    self.point_range = point_range
    self.categories = [x.name for x in path.iterdir() if x.is_dir()]

    self.items = []
    for directory in path.iterdir():
      if not directory.is_dir(): continue

      item_dir = directory / "test" if test else directory / "train"

      for item in item_dir.iterdir():
        self.items.append(item)



  def __get_item__(self, index):
    model_path = self.items[index]
    mesh = o3d.io.read_triangle_mesh(str(model_path))

    return torch.tensor(np.asarray(mesh.sample_points_uniformly(number_of_points=torch.randint(*self.point_range, (1, ))[0]).points)), model_path.parent.parent.name

  def __len__(self):
    return len(self.items)

In [4]:
data = ModelNetDataSet("ModelNet10", test=False)
len(data)

3994

## Model

In [5]:
class TNet(nn.Module):

  def __init__(self, dim):
    super(TNet, self).__init__()

    self.dim = dim

    self.conv_1 = nn.Conv1d(dim, 64, kernel_size=1)
    self.conv_2 = nn.Conv1d(64, 128, kernel_size=1)
    self.conv_3 = nn.Conv1d(128, 1024, kernel_size=1)

    self.linear_1 = nn.Linear(1024, 512)
    self.linear_2 = nn.Linear(512, 256)
    self.linear_3 = nn.Linear(256, dim*dim)

    self.bns = [nn.BatchNorm(x) for x in (64, 128, 1024, 512, 256)]

  def forward(self, x):
    # x.shape = [batch, n, 3]

    x = self.bns[0](F.relu(self.conv_1(x)))
    x = self.bns[1](F.relu(self.conv_2(x)))
    x = self.bns[2](F.relu(self.conv_3(x)))

    # Check this dimension
    x.max(dim=1)

    x = self.bns[3](F.relu(self.linear_1(x)))
    x = self.bns[4](F.relu(self.linear_2(x)))
    x = self.linear_3(x)

    # x.shape = [batch, self.dim**2]
    # Not sure if the requires grad is needed here
    identity = torch.eye(self.dim, requires_grad=True)

    if x.is_cuda():
      identity.cuda()

    return x.view(-1, self.dim, self.dim) + identity

In [6]:
class PointNetBackBone(nn.Module):
  def __init__(self, dims=(3, 64, 128, 1024), local_features=True):
    super(PointNetBackBone, self).__init__()

    self.dims = dims
    self.local_features = local_features

    self.tnet_1 = TNet(dims[0])
    self.tnet_2 = TNet(dims[1])

    self.conv_1 = nn.Conv1d(dims[0], dims[1], kernel_size=1)
    self.conv_2 = nn.Conv1d(dims[1], dims[1], kernel_size=1)

    self.conv_3 = nn.Conv1d(dims[1], dims[1], kernel_size=1)
    self.conv_4 = nn.Conv1d(dims[1], dims[2], kernel_size=1)
    self.conv_5 = nn.Conv1d(dims[2], dims[3], kernel_size=1)

    self.bns = [nn.BatchNorm(x) for x in (dims[1], dims[1], dims[1], dims[2], dims[3])]

  def forward(self, x):
    # x.shape = [batch, n, dim_1]

    # transform.shape = [batch, dim_1, dim_1]
    transform = self.tnet_1(x)
    # (transform[:, None]@x[...,None]).shape = [batch, n, dim_2, 1]
    x = (transform[:, None]@x[...,None]).squeeze()

    x = self.bns[0](F.relu(self.conv_1(x)))
    x = self.bns[1](F.relu(self.conv_2(x)))

    transform_2 = self.tnet_2(x)
    # (transform_2[:, None]@x[..., None]).shape = [batch, n, dim_2, dim_2]
    x = (transform_2[:, None]@x[..., None]).squeeze()

    if self.local_features:
      # local_features.shape = [batch, n, dim_2]
      local_features = x.clone()

    x = self.bns[2](F.relu(self.conv_3(x)))
    x = self.bns[3](F.relu(self.conv_4(x)))
    x = self.bns[4](F.relu(self.conv_5(x)))

    # x.shape = [batch, n, 1024]
    global_features = x.max(dim=1)

    if self.local_features:
      features = torch.cat((local_features, global_features), dim=2)
      return features, transform, transform_2
    else:
      return global_features, transform, transform_2



In [7]:
class PointNetClassificationHead(nn.Module):

  def __init__(self, outputs, layers, dims=None, keep=0.7):
    super(PointNetClassificationHead, self).__init__()

    if dims is None:
      self.backbone = PointNetBackBone(local_features=False)
      if layers[0] != 1024: raise ValueError(f"Dimensions don't match. {layers[0]} != 1024")
    else:
      self.backbone = PointNetBackBone(dims, False)
      if layers[0] != dims[-1]: raise ValueError(f"Dimensions don't match. {layers[0]} != {dims[-1]}")

    self.layers = [(nn.BatchNorm1d(y), nn.Linear(x, y)) for x, y in zip(layers[:-1], layers[1:])]
    self.output = nn.Linear(layers[-2], layers[-1])
    self.dropout = nn.Dropout(p=1 - keep)


  def forward(self, x):
    x, t1, t2 = self.backbone(x)

    for batch, linear in self.layers:
      x = batch(F.relu(linear(x)))

    x = self.dropout(x)
    x = self.output(x)

    return F.log_softmax(x, dim=1), t1, t2

## Training

In [8]:
# optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

NameError: name 'model' is not defined

In [9]:
def loss_function(x, y, t1: torch.Tensor, t2: torch.Tensor, alpha=1E-3): 
    nll_loss = F.nll_loss(x, y)
    t1_loss = (np.eye(t1.shape[0]) - t1@t1.transpose(1,2)).norm(p="fro")
    t2_loss = (np.eye(t1.shape[0]) - t2@t2.transpose(1,2)).norm(p="fro")

    return nll_loss + t1_loss*alpha + t2_loss*alpha

In [10]:
def train_one_epoch(epoch_index, tb_writer):
    running_loss = 0.
    last_loss = 0.

    # Here, we use enumerate(training_loader) instead of
    # iter(training_loader) so that we can track the batch
    # index and do some intra-epoch reporting
    for i, data in enumerate(training_loader):
        # Every data instance is an input + label pair
        inputs, labels = data

        # Zero your gradients for every batch!
        optimizer.zero_grad()

        # Make predictions for this batch
        outputs = model(inputs)

        # Compute the loss and its gradients
        loss = loss_fn(outputs, labels)
        loss.backward()

        # Adjust learning weights
        optimizer.step()

        # Gather data and report
        running_loss += loss.item()
        if i % 1000 == 999:
            last_loss = running_loss / 1000 # loss per batch
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            tb_x = epoch_index * len(training_loader) + i + 1
            tb_writer.add_scalar('Loss/train', last_loss, tb_x)
            running_loss = 0.

    return last_loss

In [11]:

for i in epochs:

    # This needs some tqdm magic to give me stats
    # Chunking happens with dataloader
    for data, labels in chunked_mini_batch:
        # Needs to be data augmentation here
        #   1. Resampling of the input mesh 
        #   2. Jitter on the points
        #   3. Random transform on the points
        optimizer.zero_grad()

        ys, t1s, t2s = model(data)

        loss = loss_function(data, ys, t1s, t2s)
        loss.backward()

        optimizer.step()

NameError: name 'epochs' is not defined