# Workshop on design and build a spatial reasoning system: Point cloud classification using PointNet

Course: Spatial Reasoning from Sensor Data, https://www.iss.nus.edu.sg/executive-education/course/detail/spatial-reasoning-from-sensor-data/artificial-intelligence

Contact: Tian Jing

Email: tianjing@nus.edu.sg

## Objective

- Visualize the point cloud data
- Perform point cloud classification using the PointNet approach

Reference: https://datascienceub.medium.com/pointnet-implementation-explained-visually-c7e300139698

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.datasets import MNIST
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, random_split
from tqdm.notebook import tqdm


In [None]:
# Prepare a MNIST3D dataset

def transform_img2pc(img):
    img_array = np.asarray(img)
    indices = np.argwhere(img_array > 127)
    return indices.astype(np.float32)

dataset = MNIST(root='./data', train=True, download=True)
len_points = []
for idx in range(len(dataset)):
    img,label = dataset[idx]
    pc = transform_img2pc(img)
    len_points.append(len(pc))


class MNIST3D(Dataset):
    """3D MNIST dataset."""

    NUM_CLASSIFICATION_CLASSES = 10
    POINT_DIMENSION = 3

    def __init__(self, dataset, num_points=500):
        self.dataset = dataset
        self.number_of_points = num_points

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):

        img,label = dataset[idx]
        pc = transform_img2pc(img)

        if self.number_of_points-pc.shape[0]>0:
            # Duplicate points
            sampling_indices = np.random.choice(pc.shape[0], self.number_of_points-pc.shape[0])
            new_points = pc[sampling_indices, :]
            pc = np.concatenate((pc, new_points),axis=0)
        else:
            sampling_indices = np.random.choice(pc.shape[0], self.number_of_points)
            pc = pc[sampling_indices, :]

        pc = pc.astype(np.float32)
        # add z
        noise = np.random.normal(0,0.05,len(pc))
        noise = np.expand_dims(noise, 1)
        pc = np.hstack([pc, noise]).astype(np.float32)
        pc = torch.tensor(pc)

        return pc, label


In [None]:
dataset = 'MNIST3D'
number_of_points = 200
batch_size = 128
learning_rate = 0.001


In [None]:

train_dataset = MNIST(root='./data/MNIST', download=True, train=True)
test_dataset = MNIST(root='./data/MNIST', download=True, train=False)
dataset = torch.utils.data.ConcatDataset([train_dataset, test_dataset])

dataset_3d = MNIST3D(dataset, number_of_points)
l_data = len(dataset_3d)
train_dataset, val_dataset, test_dataset = random_split(dataset_3d,
                                          [round(0.8*l_data), round(0.1*l_data), round(0.1*l_data)],
                                          generator=torch.Generator().manual_seed(1))

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
# Visualize a point cloud

pc = train_dataset[1][0].numpy()
label = train_dataset[1][1]
fig = plt.figure(figsize=[7,7])
ax = plt.axes(projection='3d')
sc = ax.scatter(pc[:,0], pc[:,1], pc[:,2], c=pc[:,0] ,s=80, marker='o', cmap="viridis", alpha=0.7)
ax.set_zlim3d(-1, 1)
plt.title(f'Label: {label}')


In [None]:
# Define the PointNet model

class TransformationNet(nn.Module):

    def __init__(self, input_dim, output_dim):
        super(TransformationNet, self).__init__()
        self.output_dim = output_dim

        self.conv_1 = nn.Conv1d(input_dim, 64, 1)
        self.conv_2 = nn.Conv1d(64, 128, 1)
        self.conv_3 = nn.Conv1d(128, 256, 1)

        self.bn_1 = nn.BatchNorm1d(64)
        self.bn_2 = nn.BatchNorm1d(128)
        self.bn_3 = nn.BatchNorm1d(256)
        self.bn_4 = nn.BatchNorm1d(256)
        self.bn_5 = nn.BatchNorm1d(128)

        self.fc_1 = nn.Linear(256, 256)
        self.fc_2 = nn.Linear(256, 128)
        self.fc_3 = nn.Linear(128, self.output_dim*self.output_dim)

    def forward(self, x):
        num_points = x.shape[1]
        x = x.transpose(2, 1)
        x = F.relu(self.bn_1(self.conv_1(x)))
        x = F.relu(self.bn_2(self.conv_2(x)))
        x = F.relu(self.bn_3(self.conv_3(x)))

        x = nn.MaxPool1d(num_points)(x)
        x = x.view(-1, 256)

        x = F.relu(self.bn_4(self.fc_1(x)))
        x = F.relu(self.bn_5(self.fc_2(x)))
        x = self.fc_3(x)

        identity_matrix = torch.eye(self.output_dim)
        if torch.cuda.is_available():
            identity_matrix = identity_matrix.cuda()
        x = x.view(-1, self.output_dim, self.output_dim) + identity_matrix
        return x


class BasePointNet(nn.Module):

    def __init__(self, point_dimension):
        super(BasePointNet, self).__init__()
        self.input_transform = TransformationNet(input_dim=point_dimension, output_dim=point_dimension)
        self.feature_transform = TransformationNet(input_dim=64, output_dim=64)

        self.conv_1 = nn.Conv1d(point_dimension, 64, 1)
        self.conv_2 = nn.Conv1d(64, 64, 1)
        self.conv_3 = nn.Conv1d(64, 64, 1)
        self.conv_4 = nn.Conv1d(64, 128, 1)
        self.conv_5 = nn.Conv1d(128, 256, 1)

        self.bn_1 = nn.BatchNorm1d(64)
        self.bn_2 = nn.BatchNorm1d(64)
        self.bn_3 = nn.BatchNorm1d(64)
        self.bn_4 = nn.BatchNorm1d(128)
        self.bn_5 = nn.BatchNorm1d(256)


    def forward(self, x, plot=False):
        num_points = x.shape[1]

        input_transform = self.input_transform(x) # T-Net tensor [batch, 3, 3]
        x = torch.bmm(x, input_transform) # Batch matrix-matrix product
        x = x.transpose(2, 1)
        tnet_out=x.cpu().detach().numpy()

        x = F.relu(self.bn_1(self.conv_1(x)))
        x = F.relu(self.bn_2(self.conv_2(x)))
        x = x.transpose(2, 1)

        feature_transform = self.feature_transform(x) # T-Net tensor [batch, 64, 64]
        x = torch.bmm(x, feature_transform)
        x = x.transpose(2, 1)
        x = F.relu(self.bn_3(self.conv_3(x)))
        x = F.relu(self.bn_4(self.conv_4(x)))
        x = F.relu(self.bn_5(self.conv_5(x)))
        x, ix = nn.MaxPool1d(num_points, return_indices=True)(x)  # max-pooling
        x = x.view(-1, 256)  # global feature vector

        return x, feature_transform, tnet_out, ix


class ClassificationPointNet(nn.Module):

    def __init__(self, num_classes, dropout=0.3, point_dimension=3):
        super(ClassificationPointNet, self).__init__()
        self.base_pointnet = BasePointNet(point_dimension=point_dimension)

        self.fc_1 = nn.Linear(256, 128)
        self.fc_2 = nn.Linear(128, 64)
        self.fc_3 = nn.Linear(64, num_classes)

        self.bn_1 = nn.BatchNorm1d(128)
        self.bn_2 = nn.BatchNorm1d(64)

        self.dropout_1 = nn.Dropout(dropout)

    def forward(self, x):
        x, feature_transform, tnet_out, ix_maxpool = self.base_pointnet(x)

        x = F.relu(self.bn_1(self.fc_1(x)))
        x = F.relu(self.bn_2(self.fc_2(x)))
        x = self.dropout_1(x)

        return F.log_softmax(self.fc_3(x), dim=1), feature_transform, tnet_out, ix_maxpool


In [None]:

model = ClassificationPointNet(num_classes=dataset_3d.NUM_CLASSIFICATION_CLASSES,point_dimension=dataset_3d.POINT_DIMENSION)
if torch.cuda.is_available():
    model.cuda()
    device = 'cuda'
else:
    device = 'cpu'

optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [None]:
# Train the model
epochs = 10
train_loss = []
test_loss = []
train_acc = []
test_acc = []
best_loss= np.inf

for epoch in tqdm(range(epochs)):
    epoch_train_loss = []
    epoch_train_acc = []

    # training loop
    for data in train_dataloader:
        points, targets = data  # [batch, num_points, 3] [batch]

        if torch.cuda.is_available():
            points, targets = points.cuda(), targets.cuda()
        if points.shape[0] <= 1:
            continue
        optimizer.zero_grad()
        model = model.train()
        preds, feature_transform, tnet_out, ix_maxpool = model(points)

        identity = torch.eye(feature_transform.shape[-1])
        if torch.cuda.is_available():
            identity = identity.cuda()
        regularization_loss = torch.norm(
            identity - torch.bmm(feature_transform, feature_transform.transpose(2, 1)))
        # Loss
        loss = F.nll_loss(preds, targets) + 0.001 * regularization_loss
        epoch_train_loss.append(loss.cpu().item())
        loss.backward()
        optimizer.step()
        preds = preds.data.max(1)[1]
        corrects = preds.eq(targets.data).cpu().sum()

        accuracy = corrects.item() / float(batch_size)
        epoch_train_acc.append(accuracy)

    epoch_test_loss = []
    epoch_test_acc = []

    # validation loop
    for batch_number, data in enumerate(test_dataloader):
        points, targets = data
        if torch.cuda.is_available():
            points, targets = points.cuda(), targets.cuda()
        model = model.eval()
        preds, feature_transform, tnet_out, ix = model(points)
        loss = F.nll_loss(preds, targets)
        epoch_test_loss.append(loss.cpu().item())
        preds = preds.data.max(1)[1]
        corrects = preds.eq(targets.data).cpu().sum()
        accuracy = corrects.item() / float(batch_size)
        epoch_test_acc.append(accuracy)

    print('Epoch %s: train loss: %s, val loss: %f, train accuracy: %s,  val accuracy: %f'
              % (epoch,
                round(np.mean(epoch_train_loss), 4),
                round(np.mean(epoch_test_loss), 4),
                round(np.mean(epoch_train_acc), 4),
                round(np.mean(epoch_test_acc), 4)))

    if np.mean(test_loss) < best_loss:
        state = {
            'model':model.state_dict(),
            'optimizer':optimizer.state_dict()
        }
        best_loss=np.mean(test_loss)

    train_loss.append(np.mean(epoch_train_loss))
    test_loss.append(np.mean(epoch_test_loss))
    train_acc.append(np.mean(epoch_train_acc))
    test_acc.append(np.mean(epoch_test_acc))


In [None]:
# Define inference function
def infer(dataset,
          model,
          point_cloud_file,
          shuffle_points=False,
          plot_tNet_out=True,
          return_indices_maxpool=False):

    num_classes = dataset.NUM_CLASSIFICATION_CLASSES
    points, label = point_cloud_file

    if torch.cuda.is_available():
        points = points.cuda()
        model.cuda()

    points = points.unsqueeze(dim=0)
    model = model.eval()
    preds, feature_transform, tnet_out, ix = model(points)
    preds = preds.data.max(1)[1]

    points = points.cpu().numpy().squeeze()
    preds = preds.cpu().numpy()

    if return_indices_maxpool:
        return preds, tnet_out, ix

    return preds, tnet_out

In [None]:
# Test data

SAMPLE = 0

fig = plt.figure(figsize=[12,6])
ax = fig.add_subplot(1, 2, 1, projection='3d')

# plot input sample
pc = test_dataset[SAMPLE][0].numpy()
label = test_dataset[SAMPLE][1]
sc = ax.scatter(pc[:,0], pc[:,1], pc[:,2], c=pc[:,0] ,s=50, marker='o', cmap="viridis", alpha=0.7)
preds, tnet_out = infer(dataset_3d, model,test_dataset[SAMPLE])
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlim3d(-1, 1)
ax.title.set_text(f'Input point cloud - Predict: {preds}')

ax = fig.add_subplot(1, 2, 2, projection='3d')
SAMPLE = SAMPLE + 1
pc = test_dataset[SAMPLE][0].numpy()
label = test_dataset[SAMPLE][1]
sc = ax.scatter(pc[:,0], pc[:,1], pc[:,2], c=pc[:,0] ,s=50, marker='o', cmap="viridis", alpha=0.7)
preds, tnet_out = infer(dataset_3d, model,test_dataset[SAMPLE])
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlim3d(-1, 1)
ax.title.set_text(f'Input point cloud - Predict: {preds}')


In [None]:
# Evaluate accuracy on test set
corrects=0

for i in tqdm(range(len(test_dataset))):
    target=test_dataset[i][1]
    pred, tnet_out = infer(dataset_3d, model, test_dataset[i], shuffle_points=False, plot_tNet_out=False)
    if target == pred:
        corrects+=1

print(f'Accuracy = {corrects/len(test_dataset)}')


$\color{red}{\text{Discussions}}$

Q1. State one reason that the traditional convolution cannot be directly applied to the point cloud data.

Q2. State one unique function of the PointNet that can aggregate information from unordered data (e.g., the input point cloud data is permuted).


In [None]:
# Provide your answers to Q1 here
#
#
#
# Provide your answers to Q2 here
#
#

**After you finish the workshop, rename and submit your .ipynb file.**