# NPM3D - Deep learning based approaches (PointNet)

In [None]:
# @title Imports

import math
import os
import random
from functools import wraps
from time import perf_counter
from typing import Callable

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

In [None]:
# @title Drive mounting

is_on_colab = False #@param {type:"boolean"}

if is_on_colab:
  try:
      from google.colab import drive

      drive.mount("/content/drive", force_remount=True)
  except ImportError:
      pass

  folder_path = "\"/content/drive/MyDrive/master/mva/s2/npm3d/TP6_materials/code/\"" #@param {type:"string"}
  os.chdir(folder_path)

from ply import read_ply

In [None]:
# @title Performance monitoring


def timeit(func: Callable) -> Callable:
    """
    Decorator for timing function execution time.

    Args:
      func: The function to time.

    Returns:
      The wrapped function.
    """

    @wraps(func)
    def timeit_wrapper(*args, **kwargs):
        start_time = perf_counter()
        result = func(*args, **kwargs)
        end_time = perf_counter()
        total_time = end_time - start_time
        print(f"Function {func.__name__} took {total_time:.2f} seconds")
        return result

    return timeit_wrapper


def checkpoint(time_ref: float = perf_counter()) -> Callable[..., None]:
    """
    Closure that stores a time checkpoint that is updated at every call.
    Each call prints the time elapsed since the last checkpoint with a custom message.

    Args:
      time_ref: The time reference to start from. By default, the time of the call will be taken.
    Returns:
      The closure.
    """

    def _closure(message: str = "") -> None:
        """
        Prints the time elapsed since the previous call.

        Args:
          message: Custom message to print. The overall result will be: 'message: time_elapsed'.
        """
        nonlocal time_ref
        current_time = perf_counter()
        if message != "":
            print(f"{message}: {current_time - time_ref:.4f}")
        time_ref = current_time

    return _closure

## Data augmentation

In [None]:
class RandomRotationZ(object):
    def __call__(self, point_cloud):
        theta = random.random() * 2.0 * math.pi
        rot_matrix = np.array(
            [
                [math.cos(theta), -math.sin(theta), 0],
                [math.sin(theta), math.cos(theta), 0],
                [0, 0, 1],
            ]
        )
        rot_point_cloud = rot_matrix.dot(point_cloud.T).T
        return rot_point_cloud


class RandomRotationY(object):
    def __call__(self, point_cloud):
        theta = random.random() * 2.0 * math.pi
        rot_matrix = np.array(
            [
                [math.cos(theta), 0, -math.sin(theta)],
                [0, 1, 0],
                [math.sin(theta), 0, math.cos(theta)],
            ]
        )
        rot_point_cloud = rot_matrix.dot(point_cloud.T).T
        return rot_point_cloud


class RandomRotationX(object):
    def __call__(self, point_cloud):
        theta = random.random() * 2.0 * math.pi
        rot_matrix = np.array(
            [
                [1, 0, 0],
                [0, math.cos(theta), -math.sin(theta)],
                [0, math.sin(theta), math.cos(theta)],
            ]
        )
        rot_point_cloud = rot_matrix.dot(point_cloud.T).T
        return rot_point_cloud


class RandomNoise(object):
    def __call__(self, point_cloud):
        noise = np.random.normal(0, 0.02, (point_cloud.shape))
        noisy_point_cloud = point_cloud + noise
        return noisy_point_cloud


class RandomSymmetry(object):
    def __call__(self, point_cloud, probability=0.5):
        axis = random.randint(0, 1)  # random axis among x and y
        if torch.rand(1) < probability:
            c_max = np.max(point_cloud[:, axis])
            point_cloud[:, axis] = c_max - point_cloud[:, axis]
        return point_cloud


class ToTensor(object):
    def __call__(self, point_cloud):
        return torch.Tensor(point_cloud)


def default_transforms():
    return transforms.Compose(
        [
            RandomRotationZ(),
            RandomNoise(),
            ToTensor(),
        ]
    )


def custom_transforms():
    return transforms.Compose(
        [
            RandomSymmetry(),
            RandomRotationZ(),
            RandomRotationY(),
            RandomRotationX(),
            RandomNoise(),
            ToTensor(),
        ]
    )


def test_transforms():
    return transforms.Compose([ToTensor()])


class PointCloudDataRAM(Dataset):
    def __init__(self, root_dir, folder="train", transform=default_transforms()):
        self.root_dir = root_dir
        folders = [
            directory
            for directory in sorted(os.listdir(root_dir))
            if os.path.isdir(root_dir + "/" + directory)
        ]
        self.classes = {folder: i for i, folder in enumerate(folders)}
        self.transforms = transform
        self.data = []
        timer = checkpoint()
        for category in self.classes.keys():
            new_dir = root_dir + "/" + category + "/" + folder
            for file in os.listdir(new_dir):
                if file.endswith(".ply"):
                    ply_path = new_dir + "/" + file
                    data = read_ply(ply_path)
                    sample = {
                        "point_cloud": np.vstack((data["x"], data["y"], data["z"])).T,
                        "category": self.classes[category],
                    }
                    self.data.append(sample)
            timer(f"Time spent on category {category}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        point_cloud = self.transforms(self.data[idx]["point_cloud"])
        return {"point_cloud": point_cloud, "category": self.data[idx]["category"]}

## Models

In [None]:
class MLP(nn.Module):
    def __init__(self, classes=10):
        super(MLP, self).__init__()
        self.net = nn.Sequential(
            nn.Flatten(),
            nn.Linear(3072, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, classes),
        )

    def forward(self, x):
        return self.net(x)


class PointNetBasic(nn.Module):
    def __init__(self, classes=10):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(3, 64, 1),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Conv1d(64, 64, 1),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Conv1d(64, 64, 1),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Conv1d(64, 128, 1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, 1024, 1),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.MaxPool1d(1024),
            nn.Flatten(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.3),
            nn.Linear(256, classes),
            nn.Softmax(dim=-1),
        )

    def forward(self, x):
        return self.net(x)


class Tnet(nn.Module):
    def __init__(self, k=3):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(3, 64, 1),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Conv1d(64, 128, 1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, 1024, 1),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.MaxPool1d(1024),
            nn.Flatten(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.3),
            nn.Linear(256, k * k),
            nn.Softmax(dim=-1),
        )

    def forward(self, x):
        return self.net(x)


class PointNetFull(nn.Module):
    def __init__(self, classes=10):
        super().__init__()
        self.t_net = Tnet()
        self.net = nn.Sequential(
            nn.Conv1d(3, 64, 1),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Conv1d(64, 64, 1),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Conv1d(64, 64, 1),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Conv1d(64, 128, 1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, 1024, 1),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.MaxPool1d(1024),
            nn.Flatten(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.3),
            nn.Linear(256, classes),
            nn.Softmax(dim=-1),
        )

    def forward(self, x):
        t_net_transform = self.t_net(x).view(-1, 3, 3) + torch.eye(3).to(device)
        return self.net(t_net_transform @ x), t_net_transform

## Losses and pipelines

In [None]:
def basic_loss(outputs, labels):
    criterion = torch.nn.CrossEntropyLoss()
    return criterion(outputs, labels)


def pointnet_full_loss(outputs, labels, m3x3, alpha=0.001):
    criterion = torch.nn.CrossEntropyLoss()
    bs = outputs.size(0)
    id3x3 = torch.eye(3, requires_grad=True).repeat(bs, 1, 1)
    if outputs.is_cuda:
        id3x3 = id3x3.cuda()
    diff3x3 = id3x3 - torch.bmm(m3x3, m3x3.transpose(1, 2))
    return criterion(outputs, labels) + alpha * (torch.norm(diff3x3)) / float(bs)


def train(model, device, train_loader, test_loader=None, epochs=250):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)
    loss = 0
    train_loss, test_accuracy = [], []
    for epoch in range(epochs):
        try:
            total_loss, total = 0., 0
            model.train()
            for i, data in enumerate(train_loader, 0):

                inputs, labels = data["point_cloud"].to(device).float(), data[
                    "category"
                ].to(device)
                optimizer.zero_grad()
                if model.__class__.__name__ == "PointNetFull":
                  outputs, m3x3 = model(inputs.transpose(1, 2))
                  loss = pointnet_full_loss(outputs, labels, m3x3)
                else:
                  outputs = model(inputs.transpose(1,2))
                  loss = basic_loss(outputs, labels)
                total_loss += loss.item()
                total += 1
                loss.backward()
                optimizer.step()
            scheduler.step()
            train_loss.append(total_loss / total)

            model.eval()
            correct = total = 0
            if test_loader:
                with torch.no_grad():
                    for data in test_loader:
                        inputs, labels = data["point_cloud"].to(device).float(), data[
                            "category"
                        ].to(device)
                        if model.__class__.__name__ == "PointNetFull":
                            outputs, __ = model(inputs.transpose(1, 2))
                        else:
                            outputs = model(inputs.transpose(1,2))
                        _, predicted = torch.max(outputs.data, 1)
                        total += labels.size(0)
                        correct += (predicted == labels).sum().item()
                test_acc = 100.0 * correct / total
                print(
                    "Epoch: %d, Loss: %.3f, Test accuracy: %.1f %%"
                    % (epoch + 1, loss, test_acc)
                )
                test_accuracy.append(test_acc)

        except KeyboardInterrupt:
            print("Training interrupted.")
            break

    return train_loss, test_accuracy

In [None]:
def plot_performance(train_loss, test_accuracy) -> None:
    """
    Plots two graphs describing the evolution of two metrics during the training process.
    """
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(11, 5))

    ax1.plot(train_loss)
    ax1.set_title("Model loss on training set")
    ax1.set_ylabel("Loss")
    ax1.set_xlabel("Epoch")

    ax2.plot(test_accuracy)
    ax2.set_title("Model accuracy on validation set")
    ax2.set_ylabel("Accuracy")
    ax2.set_xlabel("Epoch")

    plt.show()

## Experiments

In [None]:
timer = checkpoint()

ROOT_DIR = "../data/ModelNet10_PLY"

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Using device", device)

train_ds = PointCloudDataRAM(ROOT_DIR, folder="train", transform=default_transforms())
test_ds = PointCloudDataRAM(ROOT_DIR, folder="test", transform=test_transforms())

inv_classes = {i: cat for cat, i in train_ds.classes.items()}
print("Classes: ", inv_classes)
print("Train dataset size: ", len(train_ds))
print("Test dataset size: ", len(test_ds))
print("Number of classes: ", len(train_ds.classes))
print("Sample point_cloud shape: ", train_ds[0]["point_cloud"].size())

train_loader = DataLoader(dataset=train_ds, batch_size=32, shuffle=True)
test_loader = DataLoader(dataset=test_ds, batch_size=32)

timer("Time spent on dataloader initialization")

In [None]:
timer()

model = MLP() #@param ["MLP()", "PointNetBasic()", "PointNetFull()"] {type:"raw"}

model_parameters = filter(lambda p: p.requires_grad, model.parameters())
print(f"Using a {model.__class__.__name__} with {sum(np.prod(p.size()) for p in model_parameters)} parameters")

model.to(device)

mlp_train_loss, mlp_test_accuracy = train(model, device, train_loader, test_loader, epochs=75)

timer("Time spent on training")
plot_performance(mlp_train_loss, mlp_test_accuracy)

In [None]:
timer()

model = PointNetBasic() #@param ["MLP()", "PointNetBasic()", "PointNetFull()"] {type:"raw"}

model_parameters = filter(lambda p: p.requires_grad, model.parameters())
print(f"Using a {model.__class__.__name__} with {sum(np.prod(p.size()) for p in model_parameters)} parameters")

model.to(device)

ptb_train_loss, ptb_test_accuracy = train(model, device, train_loader, test_loader, epochs=75)

timer("Time spent on training")
plot_performance(ptb_train_loss, ptb_test_accuracy)

In [None]:
timer()

model = PointNetFull() #@param ["MLP()", "PointNetBasic()", "PointNetFull()"] {type:"raw"}

model_parameters = filter(lambda p: p.requires_grad, model.parameters())
print(f"Using a {model.__class__.__name__} with {sum(np.prod(p.size()) for p in model_parameters)} parameters")

model.to(device)

pt_train_loss, pt_test_accuracy = train(model, device, train_loader, test_loader, epochs=75)

timer("Time spent on training")
plot_performance(pt_train_loss, pt_test_accuracy)

In [None]:
timer = checkpoint()

ROOT_DIR = "../data/ModelNet10_PLY"

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Using device", device)

train_ds = PointCloudDataRAM(ROOT_DIR, folder="train", transform=custom_transforms())
test_ds = PointCloudDataRAM(ROOT_DIR, folder="test", transform=custom_transforms())

inv_classes = {i: cat for cat, i in train_ds.classes.items()}
print("Classes: ", inv_classes)
print("Train dataset size: ", len(train_ds))
print("Test dataset size: ", len(test_ds))
print("Number of classes: ", len(train_ds.classes))
print("Sample point_cloud shape: ", train_ds[0]["point_cloud"].size())

train_loader = DataLoader(dataset=train_ds, batch_size=32, shuffle=True)
test_loader = DataLoader(dataset=test_ds, batch_size=32)

timer("Time spent on dataloader initialization")

In [None]:
timer()

model = PointNetBasic() #@param ["MLP()", "PointNetBasic()", "PointNetFull()"] {type:"raw"}

model_parameters = filter(lambda p: p.requires_grad, model.parameters())
print(f"Using a {model.__class__.__name__} with {sum(np.prod(p.size()) for p in model_parameters)} parameters")

model.to(device)

pta_train_loss, pta_test_accuracy = train(model, device, train_loader, test_loader, epochs=500)

timer("Time spent on training")
plot_performance(pta_train_loss, pta_test_accuracy)