# PointNet Part Segmentation

## Imports

In [1]:
import numpy as np

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils

import open3d as o3d

from pathlib import Path
import os

In [2]:
if (torch.cuda.is_available()):
    print(torch.cuda.device_count(), torch.cuda.current_device())

## Visualizing Data

In [3]:
def visualize(pts, labels=None):

    is_labeled = False
    if type(labels) != type(None):
        is_labeled = True

    try:
        pts = pts.numpy()
        labels = labels.numpy()
        print('Tensor converted to numpy')
    except:
        print('Passed numpy')

    # red, green, blue, purple
    colors = np.array([[255,0,0],[0,255,0],[0,0,255],[100,0,100]])
    if is_labeled: labels = colors[labels-1]

    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(pts)
    if is_labeled: pcd.colors = o3d.utility.Vector3dVector(labels)

    o3d.visualization.draw_geometries([pcd])

In [4]:
eg_pts_hash = '1d1b37ce6d72d7855096c0dd2594842a'
eg_pts_path = f'data/train/pts/{eg_pts_hash}.pts'
eg_labels_path = f'data/train/label/{eg_pts_hash}.txt'

ex_pc = np.loadtxt(eg_pts_path, delimiter=' ')
ex_labels = np.loadtxt(eg_labels_path, delimiter=' ', dtype=np.int8)

print(ex_pc.shape, ex_labels.shape)

(2704, 3) (2704,)


In [5]:
# Uncomment to run, requires system GUI
# visualize(torch.from_numpy(ex_pc), torch.from_numpy(ex_labels))

## Custom Dataset + Make DataLoaders

In [6]:
class ChairCloudDataset(Dataset):
    def __init__(self, root: str, train: bool=True, transform=torch.from_numpy, target_transform=torch.from_numpy):
        self.train = train

        self.transform = transform
        self.target_transform = target_transform

        root = Path(root)
        pts_dir = None
        labels_dir = None
        if self.train:
            pts_dir = root / Path('pts')
            labels_dir = root / Path('label')
        else:
            pts_dir = Path(root)

        self.pts_paths = []
        self.label_paths = []
        for pts_file in os.listdir(pts_dir):
            self.pts_paths.append(pts_dir / pts_file)

            if self.train:
                self.label_paths.append(labels_dir / (Path(pts_file).stem + '.txt'))

        dataset_type = 'train' if self.train else 'test'
        print(f'Found {len(self.pts_paths)} {dataset_type} datapoints')

    def __getitem__(self, index):
        pts_file = self.pts_paths[index]
        pts = np.loadtxt(pts_file, delimiter=' ')
        pts = self.transform(pts)

        if self.train:
            label_file = self.label_paths[index]
            seg = np.loadtxt(label_file, delimiter=' ', dtype=np.uint8)
            seg = self.target_transform(seg)

            return pts.float(), seg
        
        return pts.float()

    def __len__(self):
        return len(self.pts_paths)
            



In [7]:
train_data = ChairCloudDataset('data/train', train=True)
test_data = ChairCloudDataset('data/test', train=False)

Found 1000 train datapoints
Found 6 test datapoints


In [8]:
# Uncomment to visualize
# visualize(*train_data[0])

In [9]:
from torch.nn.utils.rnn import pad_sequence

def pad_test(batch):
    return pad_sequence(batch, batch_first=True, padding_value=0)

def pad_train(batch):
    pts, seg = zip(*batch)
    
    pts_padded = pad_sequence(pts, batch_first=True, padding_value=0.0)
    seg_padded = pad_sequence(seg, batch_first=True, padding_value=0)

    return pts_padded, seg_padded

In [10]:
train_dl = DataLoader(train_data, batch_size=64, shuffle=True, collate_fn=pad_train)
test_dl = DataLoader(test_data, batch_size=64, shuffle=True, collate_fn=pad_test)

## Building PointNet Model

First, we must construct the input and feature transform steps. This consistes of a T-Net with $k$-long input, following by a matmul.

We will begin by implementing the T-Nets for general $k$-long input (we will later use once with nx3 input for input transform, then once for nx64 input for feature transform).

<img src='./imgs/annotated-tnet.jpg'>

In [11]:

import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F

class TNet(nn.Module):
    def __init__(self, k: int):
        super().__init__()

        self.k = k

        # 1st convolution + "mlp"
        self.conv1 = nn.Conv1d(k, 64, 1)
        self.batchnorm1 = nn.BatchNorm1d(64)
        # relu called directly

        # 2nd convolution + "mlp"
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.batchnorm2 = nn.BatchNorm1d(128)
        # relu called directly

        # 3rd convolution + "mlp"
        self.conv3 = nn.Conv1d(128, 1024, 1)
        self.batchnorm3 = nn.BatchNorm1d(1024)
        # relu called directly

        # max pool called directly

        # 1st fully connected layer after max pool
        self.fc1 = nn.Linear(1024, 512)
        self.batchnorm4 = nn.BatchNorm1d(512)
        # relu called directly

        # 2nd fully connected layer after max pool
        self.fc2 = nn.Linear(512, 256)
        self.batchnorm5 = nn.BatchNorm1d(256)
        # relu called directly

        # generate final output tensor (will be reshaped into kxk matrix)
        self.fc3 = nn.Linear(256, self.k * self.k)

    def forward(self, input):
        batch_size = input.size(0)

        # 1st convolution
        x = F.relu(self.batchnorm1(self.conv1(input)))
        # 2nd convolution
        x = F.relu(self.batchnorm2(self.conv2(x)))
        # 3rd convolution
        x = F.relu(self.batchnorm3(self.conv3(x)))


        # max pool
        x = nn.MaxPool1d(x.size(-1))(x)
        x = nn.Flatten(1)(x)

        # 1st fully connected layer after max pool
        x = F.relu(self.batchnorm4(self.fc1(x)))
        # 2nd fully connected layer after max pool
        x = F.relu(self.batchnorm5(self.fc2(x)))

        # final fc before reshaping to output matrix
        x = self.fc3(x)

        # init matrix to identity for orthogonality
        matrix = torch.eye(self.k, requires_grad=True).flatten().repeat(batch_size, 1)
        if matrix.is_cuda:
            matrix = matrix.cuda()
        # add to last fc layer
        matrix = x + matrix
        # reshape to batch_size x 3 x 3
        matrix = matrix.view(batch_size, self.k, self.k)
        
        return matrix

Now we can take the TNet and place it in the greater PointNet, completing the transformations as well

In [12]:
class PointNet(nn.Module):
    def __init__(self, num_parts = 4):
        super().__init__()

        self.num_parts = num_parts

        # Input Transform TNet
        self.input_tnet = TNet(3)

        # 1st shared mlp between input and feature transform steps
        self.conv1 = nn.Conv1d(3, 64, 1)
        self.batchnorm1 = nn.BatchNorm1d(64)
        # relu called directly

        # Feature Trasformation TNet
        self.feature_tnet = TNet(64)

        # 2nd shared mlp, 1st convolution
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.batchnorm2 = nn.BatchNorm1d(128)
        # relu called directly

        # 2nd shared mlp, 2nd convolution
        self.conv3 = nn.Conv1d(128, 1024, 1)
        self.batchnorm3 = nn.BatchNorm1d(1024)
        # relu called directly


        # segmentation network, shared mlp, convolution 1
        self.conv4 = nn.Conv1d(1088, 512, 1)
        self.batchnorm4 = nn.BatchNorm1d(512)
        # relu called directly

        # segmentation network, shared mlp, convolution 2
        self.conv5 = nn.Conv1d(512, 256, 1)
        self.batchnorm5 = nn.BatchNorm1d(256)
        # relu called directly

        # segmentation network, shared mlp, convolution 3
        self.conv6 = nn.Conv1d(256, 128, 1)
        self.batchnorm6 = nn.BatchNorm1d(128)
        # relu called directly

        # segmentation network, shared mlp, convolution 4
        self.conv7 = nn.Conv1d(128, num_parts, 1)
        self.batchnorm7 = nn.BatchNorm1d(num_parts)
        # relu called directly

        self.logsoftmax = nn.LogSoftmax(dim=1)

    def forward(self, input):
        num_pts = input.size(-1)

        # input transformation
        mat3x3 = self.input_tnet(input)

        x = input.transpose(2, 1)       # align dims
        x = torch.bmm(x, mat3x3)
        x = x.transpose(2, 1)           # put channels back in correct spot

        # 1st shared mlp between input and feature transform steps
        x = F.relu(self.batchnorm1(self.conv1(x)))

        # feature transformation
        mat64x64 = self.feature_tnet(x)
        x = x.transpose(2, 1)
        x = torch.bmm(x, mat64x64)
        x = x.transpose(2, 1)

        # save for segmentation network later
        feature_matrix = x

        # 2nd shared mlp convolutions
        x = F.relu(self.batchnorm2(self.conv2(x)))
        x = F.relu(self.batchnorm3(self.conv3(x)))

        # Max Pool for symmmetric func / perm invariance
        global_features = nn.MaxPool1d(x.size(-1))(x)

        # combine feature transformed matrix with vstacked global feature matrix
        global_repeated = global_features.repeat(1, 1, num_pts)
        # begin segmentation network section
        seg_x = torch.hstack((feature_matrix, global_repeated))

        # segmentation network, shared mlp convolutions
        seg_x = F.relu(self.batchnorm4(self.conv4(seg_x)))
        seg_x = F.relu(self.batchnorm5(self.conv5(seg_x)))
        seg_x = F.relu(self.batchnorm6(self.conv6(seg_x)))
        seg_x = F.relu(self.batchnorm7(self.conv7(seg_x)))

        return self.logsoftmax(seg_x), mat64x64
        

## Training Functions

In [13]:
def train(train_dl, alpha, epochs):

    NUM_PARTS = 5

    # Paper defines regularization as || I - A A^T || ^2
    def loss_reg(mat64x64): 
        I = torch.eye(64).repeat(64, 1, 1)
        if mat64x64.is_cuda:
            I.cuda()
        matT = mat64x64.transpose(2, 1)
        bmmMat = torch.bmm(mat64x64, matT)
        print(I.shape, mat64x64.shape, matT.shape, bmmMat.shape)
        return torch.mean(torch.norm(I - bmmMat, dim=(1, 2)))
    
    def train_step(train_dl, pnet, epoch, alpha=0.001):

        batch = 1
        total_batches = len(train_dl)
        for pts, label in iter(train_dl):
                
            pts = pts.transpose(2, 1)

            if torch.cuda.is_available():
                pts, label = pts.cuda(), label.cuda()
                pnet.cuda()

            optimizer.zero_grad()
            pnet = pnet.train()

            pred, mat64x64 = pnet(pts)

            pred = pred.view(-1, NUM_PARTS)
            label = label.view(-1, 1)

            loss = F.nll_loss(pred, torch.flatten(label)) + alpha * loss_reg(mat64x64)
            loss.backward()

            optimizer.step()

            correct = pred.data.max(1)[1].eq(label.data).sum()

            print(f'Epoch {epoch+1}, batch {batch} / {total_batches}:', f'Accuracy: {correct.item()/label.size(0)}', f'Loss: {loss}')

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f'Training on {device} for {epochs} epochs')

    # we'll set num_parts to 5 so that the padded points are classified as 0,
    # which is not included in the labels and thus won't affect the actual model
    pnet = PointNet(num_parts=NUM_PARTS)
    pnet.to(device)
    optimizer = torch.optim.Adam(pnet.parameters(), lr=0.001)

    for epoch in range(epochs):
        train_step(train_dl, pnet, epoch)
    


In [14]:
train(train_dl, 0.001, 10)

Training on cpu for 10 epochs
1
torch.Size([64, 64, 64]) torch.Size([64, 64, 64]) torch.Size([64, 64, 64]) torch.Size([64, 64, 64])
2
3


: 

: 