<a href="https://colab.research.google.com/github/EugenioBugli/PointCloud3D/blob/main/PointCloud3D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports


In [3]:
%%capture
!pip install -q open3d
!pip install -q torch_scatter
!pip install ninja
!pip install 'git+https://github.com/facebookresearch/pytorch3d.git@stable'
!pip install trimesh
!pip install pyfqmr
!pip install .
!git clone https://github.com/EugenioBugli/PointCloud3D.git

In [7]:
%matplotlib inline

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import tqdm

import os
import numpy as np
import matplotlib.pyplot as plt
from torch_scatter import scatter_mean, scatter_max

import open3d as o3d
import plotly.graph_objects as go

from pytorch3d.loss import chamfer_distance
import trimesh
import pyfqmr

import sys
sys.path.insert(0,'/content/PointCloud3D')
from PointCloud3D.src.dataset import FAUST_Dataset, LoadDataset, openDataFiles
from PointCloud3D.src.utils import Plot2D, voxel2Numpy, PlotVoxel, Plot2DWithBuckets
from PointCloud3D.src.unet import UNet

if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using PyTorch version:', torch.__version__, ' Device:', device)

from google.colab import drive
drive.mount("/content/drive")

drive_path = "/content/drive/MyDrive/CV/MPI-FAUST"
training_path = drive_path + "/training"
test_path = drive_path + "/test"

Using PyTorch version: 2.5.1+cu121  Device: cuda
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
SAMPLING_TYPE = "RANDOM"
SAMPLING_SIZE = 2048
BATCH_SIZE = 5

IN_DIM_RES_PT = 64
FEATURES_DIM = 32
NUM_BLOCKS = 5
NUM_FC = 4
NUM_PLANES = 4
NUM_COORDINATES = 3

# 1] Dataset


In [9]:
# use this to directly import the already pre-processed dataset

# train_scan_files, train_reg_files, val_scan_files, val_reg_files, test_scan_files, test_reg_files = openDataFiles(training_path, test_path, val_size=0.2)
# train_set = FAUST_Dataset(train_scan_files, train_reg_files, sampling_type=SAMPLING_TYPE, sampling_size=SAMPLING_SIZE, partition="TRAIN", transform=transforms.ToTensor())
# val_set = FAUST_Dataset(val_scan_files, val_reg_files, sampling_type=SAMPLING_TYPE, sampling_size=SAMPLING_SIZE, partition="VAL", transform=transforms.ToTensor())
# test_set = FAUST_Dataset(test_scan_files, test_reg_files, sampling_type=SAMPLING_TYPE, sampling_size=SAMPLING_SIZE, partition="TEST", transform=transforms.ToTensor())

train_set = LoadDataset("train_set.pt")
val_set = LoadDataset("val_set.pt")
test_set = LoadDataset("test_set.pt")

train_loader = DataLoader(dataset=train_set, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(dataset=val_set, batch_size=BATCH_SIZE)
test_loader = DataLoader(dataset=test_set, batch_size=BATCH_SIZE)

In [161]:
import torch
import numpy as np
import open3d as o3d
import plotly.graph_objects as go
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import DatasetFolder
from sklearn.model_selection import train_test_split

class FAUST_Dataset(Dataset):
    """
        This class is used to load a partition of the FAUST dataset. Before using this you must access the file via DatasetFolder.
    """

    def __init__(self, scan_files, reg_files=None, sampling_type="RANDOM", sampling_size=1024, partition="TRAIN", transform=None):
        super(FAUST_Dataset, self).__init__()
        self.scan_files = scan_files # list of files .ply
        self.partition = partition
        self.reg_files = reg_files if partition in ["TRAIN", "VAL"] else None # list of files .ply
        self.sampling_type = sampling_type
        self.sampling_size = sampling_size
        self.transform = transform

        self.scans, self.regs = self.extractClouds()

    def __getitem__(self, index):
        if self.partition in ["TRAIN", "VAL"]:
            if self.transform:
                return self.transform(self.scans[index]).to(torch.float32).squeeze(0), self.transform(self.regs[index]).to(torch.float32).squeeze(0)
            return self.scans[index], self.regs[index]
        else: # test case
            if self.transform:
                return self.transform(self.scans[index]).to(torch.float32).squeeze(0)
            return self.scans[index]

    def __len__(self):
        return len(self.scan_files)

    def extractClouds(self):
        # we have to open the files .ply and transform them into point clouds:

        scans = []
        regs = []

        for i in range(len(self.scan_files)):
            s = o3d.io.read_point_cloud(self.scan_files[i])
            scans.append(self.SamplingFunction(s))

            if self.partition in ["TRAIN", "VAL"]:
                r = np.asarray(o3d.io.read_point_cloud(self.reg_files[i]).points)
                regs.append(r)

        return scans, regs if regs else None

    def plotCloud(self, cloud):
        """
        This function is used to plot the Point Cloud

        Args:
            cloud (np.array): Point Cloud

        Returns:
            None
        """
        if len(cloud.shape) == 3:
            # when using DataLoader you have a shape (1, SAMPLING_SIZE, 3)
            cloud = cloud[0]

        fig = go.Figure(
            data=[
            go.Scatter3d(
                x =cloud[:,0], y=cloud[:,1], z=cloud[:,2],
                mode = 'markers',
                marker = dict(size=0.5, color=[])
            )
        ],
        layout=dict(
            scene=dict(
                xaxis=dict(visible=True),
                yaxis=dict(visible=True),
                zaxis=dict(visible=True),
                )
            )
        )
        fig.show()

    def SamplingFunction(self, cloud):
        """
            This function is used to sample a small Subset of points from the Point Clouds inside our Dataset.

            @INPUT :
                > cloud : Point Cloud extracted from .ply file

            @OUTPUT :
                > sampled_cloud : Sampled Point Cloud
        """

        if self.sampling_type == 'RANDOM':
            points = np.asarray(cloud.points)
            indices = np.random.choice(len(points), size=self.sampling_size)
            sampled_cloud = points[indices]
        if self.sampling_type == 'IMPORTANCE':
            numOfNeighbors = 20
            # estimate normal vectors to the surface at each point of the cloud
            cloud.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=numOfNeighbors))
            tree = o3d.geometry.KDTreeFlann(cloud) # faster
            # loop over the points and compute curvature
            curvature = np.zeros(len(cloud.points))
            for i in range(len(cloud.points)):
                # find indices of the neighbors
                [_ , idx, _] = tree.search_knn_vector_3d(cloud.points[i], numOfNeighbors)
                neighbors = np.asarray(cloud.points)[idx, :]
                # compute covariance matrix for each point
                covarianceMat = np.cov(neighbors.T)
                # extract eigenvalues
                eigen, _ = np.linalg.eigh(covarianceMat)
                # compute curvature
                curvature[i] = min(eigen) / sum(eigen)
            # extract the best SamplingPoints points
            maxCurvaturePoints = curvature.argsort()[-self.SamplingSize:]
            sampled_cloud = np.asarray(cloud.points)[maxCurvaturePoints]

        return sampled_cloud

# Save your preprocessed Dataset:
def SaveDataset(dataset, path):
    torch.save(dataset, "/content/drive/MyDrive/CV/PreProcessed/"+path)

# Load your preprocessed Dataset:
def LoadDataset(path):
    return torch.load("/content/drive/MyDrive/CV/PreProcessed/"+path, weights_only=False)

def openDataFiles(training_path, test_path, val_size):
    """
        This function is used to import all the .ply files from the folders. Training is partitioned into train and validation set directly here.
    """

    training_dataset = DatasetFolder(
        root = training_path,
        loader = o3d.io.read_point_cloud,
        extensions = ('ply',),
        allow_empty = True,
        )

    test_dataset = DatasetFolder(
        root = test_path,
        loader = o3d.io.read_point_cloud,
        extensions = ('ply',),
        allow_empty = True,
    )

    unsorted_training_scan_files = [sample for sample, t in training_dataset.samples if t == 2]
    training_scan_files = sorted(unsorted_training_scan_files, key=lambda x: int(x.split('_')[-1].split('.')[0]))

    unsorted_training_reg_files = [sample for sample, t in training_dataset.samples if t == 1]
    training_reg_files = sorted(unsorted_training_reg_files, key=lambda x: int(x.split('_')[-1].split('.')[0]))

    # perform train-validation split :

    train_scan_files, val_scan_files, train_reg_files, val_reg_files = train_test_split(training_scan_files, training_reg_files, test_size=val_size, random_state=15)


    unsorted_test_scan_files = [sample for sample, t in test_dataset.samples if t == 1]
    test_scan_files = sorted(unsorted_test_scan_files, key=lambda x: int(x.split('_')[-1].split('.')[0]))
    test_reg_files = test_scan_files # check here
    # we don't have any registration for the test set --> use instead the complete point cloud


    return train_scan_files, train_reg_files, val_scan_files, val_reg_files, test_scan_files, test_reg_files

# 1.2] Visualization

In [10]:
# add noise to a point cloud:
for batch in train_loader:
    eg = batch[0][0]
    eg = (eg - eg.min())/(eg.max() - eg.min() + 10e-6)

    pc = o3d.geometry.PointCloud()
    pc.points = o3d.utility.Vector3dVector(eg)
    o3d.visualization.draw_plotly([pc])

    eg_noise = eg + np.random.normal(0, 0.05, eg.shape) # gaussian noise
    pc_noise = o3d.geometry.PointCloud()
    pc_noise.points = o3d.utility.Vector3dVector(eg_noise)
    o3d.visualization.draw_plotly([pc_noise])

    pc.scale(1 / np.max(pc.get_max_bound() - pc.get_min_bound()), center=pc.get_center())

    # try downsampling the point cloud that you have with 3D voxels
    # all the points belonging to the cloud are bucketed into voxels and then each occupied voxel generates exactly one point by averaging all points inside.
    PlotVoxel(eg)

    break


[ True  True  True ...  True  True  True]
[ 4 18  3]


# 2] Encoder

## 2.1) ResBlock + Functions

In [11]:
class ResBlock(nn.Module):
    """
        This class is used to define a Residual Block, which is one of the main component of the ResNetPointNet architecture
    """

    def __init__(self, in_dim=64, n_points=2048, h_dim=32, out_dim=64):
        super(ResBlock, self).__init__()

        self.n_points = n_points

        #> First part of the Block

        self.fc1 = nn.Linear(
            in_dim,
            h_dim
        )
        self.bn1 = nn.BatchNorm1d(self.n_points)

        #> Second part of the Block

        self.fc2 = nn.Linear(
            h_dim,
            out_dim
        )
        self.bn2 = nn.BatchNorm1d(self.n_points)

        #> Skip connection

        if in_dim != out_dim:
            # size mismatch (never happen in my case)
            self.residual = nn.Linear(in_dim, out_dim)
        else:
            # same size
            self.residual = None


    def forward(self, x):
        # Input: (b,p,in_dim) = (b,p,64)

        first_part = F.relu(self.bn1(self.fc1(x))) # (b,p,64) -> (b,p,32)

        second_part = self.bn2(self.fc2(first_part)) # (b,p,32) -> (b,p,64)

        if self.residual is None:
            # no size mismatch
            res = x # (b,p,64)
        else:
            # transformation if there is a size mismatch in_dim != out_dim
            res = self.residual(x) # (b,p,in_dim) -> (b,p,64)

        # add residual connection
        third_part = second_part + res # (b,p,64) -> (b,p,64)

        return F.relu(third_part)

## 2.2) ResNetPointNet

In [12]:
def CloudNormalization(input_cloud):
    # normalize cloud in order to have it in [0,1]
    min = input_cloud.min()
    max = input_cloud.max()
    norm_cloud = (input_cloud - min)/(max-min)
    return norm_cloud

def CanonicalProjection(input_cloud):
    # gives you a dict with all 3 canonical projections (xy, xz, yz)
    xy_proj = CloudNormalization(input_cloud[:,:,[0,1]]) # XY plane (view from above)
    xz_proj = CloudNormalization(input_cloud[:,:,[0,2]]) # XZ plane (view from long side)
    yz_proj = CloudNormalization(input_cloud[:,:,[1,2]]) # YZ plane (view from short side)
    return {"xy": xy_proj, "xz": xz_proj, "yz": yz_proj}

def DivideInBuckets(input_cloud_proj, resolution):
    # in input we have the canonical projection of the input cloud and the resolution of our buckets
    # map 2D coordinates into one dimension
    xy_buck = (input_cloud_proj["xy"]*resolution).long()
    xz_buck = (input_cloud_proj["xz"]*resolution).long()
    yz_buck = (input_cloud_proj["yz"]*resolution).long()
    return {"xy": xy_buck[:,:,0] + resolution*xy_buck[:,:,1], "xz": xz_buck[:,:,0] + resolution*xz_buck[:,:,1], "yz": yz_buck[:,:,0] + resolution*yz_buck[:,:,1]}

def LocalMaxPooling(input_features, input_cloud, resolution):
    # given the 3D point cloud I need to project them into a 2D grid. In this way I can define the neighborhood to perform local pooling
    # voxelization is not a good idea since I will lose details (all points are averaged into a voxel)
    canon_proj = CanonicalProjection(input_cloud)
    # we need to divide this 2D planes into buckets and the perform inside each of them the pooling operation
    indices = DivideInBuckets(canon_proj, resolution) # only the full buckets are taken from the resolution^2 buckets
    pooled_dict = dict()
    gathered_container = 0
    for plane in canon_proj.keys():
        projection_indices = indices[plane]
        scatter_container = []
        # max pooling operation over the input_features (batch, num_points, 32)
        for batch in range(input_cloud.shape[0]):
            features = input_features[batch]
            per_batch_indices = projection_indices[batch]

            scatter_batch, _ = scatter_max(features, per_batch_indices, dim=0, dim_size=resolution**2)
            scatter_container.append(scatter_batch)

        scatter_container = torch.stack(scatter_container, dim=0) # (batch_size, resolution**2, features_dim)
        pooled_dict[plane] = scatter_container
        # perform the gathering operation over the max point of each plane
        projection_indices_clamp = projection_indices.clamp_(0, resolution**2-1)
        gathered = scatter_container.gather(dim=1, index=projection_indices_clamp.unsqueeze(-1).expand(input_cloud.shape[0], input_cloud.shape[1], scatter_container.size(-1)))
        gathered_container += gathered
    return gathered_container # (batch_size, num_points, features_dim)

In [13]:
class ResNetPointNet(nn.Module):
    """
        This class is used to define the PointNet model used to form a feature embedding for each point in the Point Cloud given in input.
        Architecture design:

            @ INPUT: Tensor of shape (batch_size, num_points, 3)

            > Fully Connected Layer (3, in_dim=64)
            > 5 Residual Blocks with Local Pooling and Concatenation
            > Fully Connected Layer (out_dim=32, out_dim=32)

            @ OUTPUT: Tensor of shape (batch_size, num_points, 32)
    """

    def __init__(self, in_dim=64, n_points=2048, res_dim=32, out_dim=32, n_blocks=5):
        super(ResNetPointNet, self).__init__()

        self.fc1 = nn.Linear(3, in_dim)

        self.res = nn.ModuleList([
            ResBlock(in_dim, n_points, res_dim, out_dim) for n_res in range(n_blocks)
        ])

        self.fc2 = nn.Linear(out_dim, out_dim)

    def forward(self, x):
        # Input: (b,p,3)

        # Extract Normalized coordinates and indices to perform local pooling

        norm_coord = CanonicalProjection(x) # (b,p,2)
        coord_indices = DivideInBuckets(norm_coord, 32) # (b,p,2)

        # First FC Layer
        fc1 = F.relu(self.fc1(x)) # (b,p,3) -> (b,p,64)
        #print("fc1: ",fc1.shape)
        # First ResBlock
        res = self.res[0](fc1) # (b,p,64) -> (b,p,32)
        #print("res: ",res.shape)
        # 2-5 ResBlock
        for res_block in self.res[1:]:
            pool = LocalMaxPooling(res, x, 32) # (b,p,32)
            #print("pool: ",pool.shape)
            concat = torch.cat([res, pool], dim=2) # (b,p,32) | (b,p,32) -> (b,p,64)
            #print("concatenation: ",concat.shape)
            # following residual block
            res = res_block(concat) # (b,p,64) -> (b,p,32)

        # Last FC Layer
        final = F.relu(self.fc2(res)) # (b,p,32) -> (b,p,32)

        #   print(f"ResNetPointNet : input ({x.shape}) ---> output ({final.shape}) \n")

        return final

## 2.3) Plane Predictor

In [14]:
class SimplePointNet(nn.Module):
    """
        This class is used to define a simple variant of the PointNet Model, which is one of the main components of the Plane Predictor Network.
        This Network will provide us a global context of the Input Point Clouds
        Architecture Design:

            @ INPUT: Tensor of shape (batch_size, num_points, 3) which represent the Input Point Clouds

            > Fully Connected Layer (3, 64)

              |> Fully Connected Layer (64, 32)
            2*|> Global Max Pooling
              |> Concatenation btw Pooled and unpooled features

            > Fully Connected Layer (64, 32)
            > Global Max Pooling

            @ OUTPUT: Tensor of shape (batch_size, 1, 32) which will be used by the rest of the Plane Predictor
    """

    def __init__(self, batch_size, in_dim=64, n_points=2048, hid_dim=32, out_dim=64):
        super(SimplePointNet, self).__init__()

        self.batch_size = batch_size
        self.n_points = n_points

        self.initial_fc = nn.Linear(in_features=3, out_features=in_dim)

        self.fc1 = nn.Linear(in_features=in_dim, out_features=hid_dim)

        self.fc2 = nn.Linear(in_features=in_dim, out_features=hid_dim)

        self.final_fc = nn.Linear(in_features=in_dim, out_features=hid_dim)

        self.pool = nn.AdaptiveMaxPool1d(output_size=1)

    def forward(self, x):

        x = F.relu(self.initial_fc(x)) # (b,p,3) --> (b,p,64)

        x1 = F.relu(self.fc1(x)) # (b,p,64) --> (b,p,32)
        x1_t = x1.transpose(1,2) # (b,p,32) -> (b,32,p)
        pool_x1 = self.pool(x1_t) # (b,32,p) -> (b,32,1)
        exp_pool_x1 = pool_x1.transpose(1,2).expand(self.batch_size, self.n_points, 32) # (b,32,1) -> (b,1,32) -> (b,p,32)
        concat_x1 = torch.cat([x1, exp_pool_x1], dim=2) # (b,p,32) | (b,p,32) -> (b,p,64)

        x2 = F.relu(self.fc2(concat_x1)) # (b,p,64) -> (b,p,32)
        x2_t = x2.transpose(1,2) # (b,p,32) -> (b,32,p)
        pool_x2 = self.pool(x2_t) # (b,32,p) -> (b,32,1)
        exp_pool_x2 = pool_x2.transpose(1,2).expand(self.batch_size, self.n_points, 32) # (b,32,1) -> (b,1,32) -> (b,p,32)
        concat_x2 = torch.cat([x2, exp_pool_x2], dim=2) # (b,p,32) | (b,p,32) -> (b,p,64)

        pre_pool_out = F.relu(self.final_fc(concat_x2)) # (b,p,64) -> (b,p,32)
        pre_pool_out_t = pre_pool_out.transpose(1,2) # (b,p,32) -> (b,32,p)
        out = self.pool(pre_pool_out_t) # (b,32,p) -> (b,32,1)

        # print(f"SimplePointNet : input ({x.shape}) ---> output ({out.transpose(1,2).shape}) \n")
        return out.transpose(1,2) # (b,32,1) -> (b,1,32)

In [15]:
class PlanePredictor(nn.Module):
    """
        This class is used to define the Plane Predictor of our Architecture, which will predict the plane parameters of L dynamic planes
        Architecture design:

            @ INPUT: Tensor of shape (batch_size, num_points, 3) which represent Point Clouds

            > Simple PointNet which learns the global context of the input point clouds
            > This information is encoded into one global feature by using Max Pooling
            > 4 Fully Connected Layers with hidden dimension = 32
            > Transform this features with a shallow net (from 32 to L dim) (plane parameters)
            > Refine each plane parameter with a Shallow net
            > Get plane features (from L to 32 dim)
            > Each plane-specific feature is expanded to N x D to match the output of the point cloud encoder, which will be summed together


            @ OUTPUT: Tensor of shape (batch_size, num_points, 32) which will be summed up to the features given by the ResNetPointNet before
                      being processed into U-Net and the plane parameter tensor (batch_size, L, 3)

    """
    def __init__(self, in_dim=32, n_points=2048, n_fc=4, L=4):
        super(PlanePredictor, self).__init__()

        self.pointNet = SimplePointNet(batch_size=BATCH_SIZE, in_dim=IN_DIM_RES_PT, n_points=n_points, hid_dim=in_dim, out_dim=IN_DIM_RES_PT)
        self.n_points = n_points
        self.L = L

        self.four_fc = nn.ModuleList(
            [nn.Linear(in_dim, in_dim) for i in range(n_fc)]
        )
        # want L plane parameters
        self.feat_to_plane = nn.Linear(in_dim, 3*self.L)

        self.L_shallows = nn.ModuleList(
            [nn.Linear(3, 3) for i in range(self.L)]
        )

        # L FC layers with hidden dim = 32

        self.L_plane_to_features = nn.ModuleList(
            [nn.Linear(3, in_dim) for i in range(self.L)]
        )

    def forward(self, x):

        flc = self.pointNet(x) # (b,p,3) -> (b,1,32) # features from simplepointnet

        for fc in self.four_fc: # 4 fully connected block
            flc = F.relu(fc(flc))  # (b,1,32) -> (b,1,32)

        plane_param = F.relu(self.feat_to_plane(flc)) # (b,1,32) -> (b,1,3*L)
        plane_param = plane_param.view(plane_param.shape[0], self.L, 3) # (b,1,3*L) -> (b,L,3)

        stack_of_planes = []
        features = 0
        for p in range(self.L):
            # I need to refine the plane parameters by passing them into a separate shallow net
            pth_plane = F.relu(self.L_shallows[p](plane_param[:,p,:])) # (b,3) -> (b,3)
            stack_of_planes.append(pth_plane)

            pth_feat = F.relu(self.L_plane_to_features[p](pth_plane)) # (b,3) -> (b,32)
            features += pth_feat.unsqueeze(1)

        out_features = features.expand(x.shape[0], self.n_points, pth_feat.shape[-1]) # (b,L,32) -> (b,p,32)
        stack_of_planes = torch.stack(stack_of_planes, dim=1)
        # print(f"PlanePredictor : input ({x.shape}) ---> output ({out_features.shape}, {stack_of_planes.shape}) \n")
        return out_features, stack_of_planes

## 2.4) Feature Projection Process

$R = I + skew(v) + skew(v)^2 \cdot \frac{1-\mathcal{k} \cdot \mathcal{\hat{n}}} {||v||^2}$

In [16]:
# we need to project the encoded features (the output of the previous step aka 2.2+2.3) onto the dynamic planes with a defined size of H x W grid and then
# apply max-pooling for the features inside each cell
# 3 different operations must be employed to keep them inside H x W grids.
# > Basis Change
# > Orthographic Projection aka project the summed-up features onto the dynamic planes
# > Normalization
# > apply local max pooling to each bucket (H*W buckets)

def skew_symm(v):
    if len(v.shape) == 2: return torch.tensor([[0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0]]).to(device)
    else: # if you pass a batch of vectors
        sk = torch.zeros((v.shape[0], 3, 3)).to(device)
        for j in range(v.shape[0]):
            sk[j] = skew_symm(v[j])
        return sk

def dot_prod(v1, v2): # assume that you always pass 2 vect with same dimension (2D or 3D)
    if len(v1.shape) == 2:
        return torch.bmm(v1.unsqueeze(2), v2.unsqueeze(1))
    return torch.bmm(v1.transpose(1,2),v2)


# Given the vectors, of the 3 canonical axes, (i,j,k) and the learned plane normal n we need to perform a change of basis:
# First we transform n into a unit vector n_hat
# We compute the rotation matrix that aligns k (basic vector of the ground plane) and n_hat by following the equation displayed above

def ChangeOfBasis(n): # n is the learned plane normal (what you get from the plane predictor net) (batch_size, 3, 1)
    # reorientation of the coordinate system to align it with the dynamic plane
    n = n.to(device)
    # these unit vectors need a shape (batch_size*L, 3, 1)
    i = torch.tensor([1.,0.,0.], device=device).repeat(n.shape[0], 1).unsqueeze(2)
    j = torch.tensor([0.,1.,0.], device=device).repeat(n.shape[0], 1).unsqueeze(2)
    k = torch.tensor([0.,0.,1.], device=device).repeat(n.shape[0], 1).unsqueeze(2)

    n_hat = n/torch.norm(n)
    v = skew_symm(k) @ n_hat# cross product btw k and n_hat
    R = torch.eye(3).to(device).unsqueeze(0).repeat(n.shape[0], 1, 1) + skew_symm(v) + skew_symm(v).matmul(skew_symm(v))*(1 - dot_prod(k,n_hat))/(torch.norm(v)**2)
    # this rotation matrix is applied to i and j

    i_p = R @ i
    j_p = R @ j

    if torch.norm(i_p) != 1 : i_p = i_p/torch.norm(i_p)
    if torch.norm(j_p) != 1 : j_p = j_p/torch.norm(j_p)

    # return i_p, j_p
    return i_p.reshape(BATCH_SIZE, NUM_PLANES, 3, 1), j_p.reshape(BATCH_SIZE, NUM_PLANES, 3, 1)

def OrthographicProjection(p, i_p, j_p): # probably has more sense to simply use R
    # p is the set of points that must be projected while i_p and j_p are the axes of the plane
    if len(p.shape) < 4:
        p = p.unsqueeze(1).repeat(1, i_p.shape[1], 1, 1) # (b,p,3) -> (b,L,p,3)
        p = p.view(i_p.shape[0]*i_p.shape[1], p.shape[2], 3) # (b,L,p,3) -> (b*L,p,3)

    if len(i_p.shape) > 3: # assume that both i_p and j_p have the same size
        i_p = i_p.view(i_p.shape[0]*i_p.shape[1], i_p.shape[2], 1)
        j_p = j_p.view(j_p.shape[0]*j_p.shape[1], j_p.shape[2], 1)

    x_p = torch.bmm(p, i_p)
    y_p = torch.bmm(p, j_p)

    return torch.stack([x_p, y_p], dim=2).view(BATCH_SIZE, NUM_PLANES, p.shape[1], 2).to(device)

def ComputeNormConst(i_p, j_p): # use this to compute directly the normalization constant c >= 1

    i_p_abs = torch.abs(i_p.view(i_p.shape[0]*i_p.shape[1], i_p.shape[2], 1))
    j_p_abs = torch.abs(j_p.view(j_p.shape[0]*j_p.shape[1], j_p.shape[2], 1))

    ones = torch.ones((i_p_abs.shape[0], 1, 3)).to(device)

    a_i = torch.bmm(ones, i_p_abs)/(torch.bmm(i_p_abs.transpose(2,1),i_p_abs)) * i_p_abs
    a_j = torch.bmm(ones, j_p_abs)/(torch.bmm(j_p_abs.transpose(2,1),j_p_abs)) * j_p_abs

    c = torch.max(torch.norm(a_i, dim=1), torch.norm(a_j, dim=1))
    c = torch.clamp(c, min=1.0) # must be >= 1
    return c # (b*L, 1, 1)

# Move everything into a grid

def FeatureProjection(input_cloud, plane_features, plane_param, H=64, W=64):
    # plane_features, plane_parame are the output of the plane predictor net
    i_p, j_p = ChangeOfBasis(plane_param.reshape(BATCH_SIZE*NUM_PLANES, 3).unsqueeze(2)) # both are (b,L,3,1)
    projected_plane = OrthographicProjection(input_cloud, i_p, j_p) # (b,L,p,2)
    c = ComputeNormConst(i_p, j_p) #(b*L,1)
    # normalization
    norm_plane = projected_plane.view(c.shape[0], projected_plane.shape[2], 2)/c.unsqueeze(1) # (b*L,p,2)

    # now we need to perform pooling inside each bucket of a grid H*W
    x_norm = (norm_plane[:,:,0]*W).long().clamp(0,W-1)
    y_norm = (norm_plane[:,:,1]*H).long().clamp(0,H-1)
    indices = x_norm + W*y_norm# .view(BATCH_SIZE, NUM_PLANES, input_cloud.shape[1])# (b,L,p)

    out_scatter, _ = scatter_max(plane_features.reshape(BATCH_SIZE*NUM_PLANES, input_cloud.shape[1], -1), indices, dim=1, dim_size=H*W)

    return out_scatter.view(BATCH_SIZE*NUM_PLANES, H, W, -1), norm_plane

## 2.5) Complete Architecture

In [17]:
class Encoder(nn.Module):

    def __init__(self, in_dim=64, out_dim=32, n_points=2048, n_blocks=5, num_planes=NUM_PLANES, num_fc=4):
        super(Encoder, self).__init__()

        self.resnet_pointnet = ResNetPointNet(in_dim=in_dim, n_points=n_points, out_dim=out_dim, n_blocks=n_blocks).to(device)
        self.plane_predictor = PlanePredictor(in_dim=out_dim, n_points=n_points, n_fc=num_fc, L=num_planes).to(device)
        self.UNet = UNet(in_dim=out_dim, out_dim=out_dim, features_dim=in_dim, n_points=n_points).to(device)


    def forward(self, x):
        feat_resnet = self.resnet_pointnet(x)
        feat_plane, plane_param = self.plane_predictor(x)
        summed_up_feat = (feat_resnet + feat_plane).unsqueeze(1).repeat(1, NUM_PLANES, 1, 1) # (b,p,32) -> (b,L,p,32)
        # here we need to perform features projection

        feat_proj, norm_plane = FeatureProjection(x, summed_up_feat, plane_param) # returns the projection of the features and the projected and norm plane
        out_unet = self.UNet(feat_proj.permute(0,-1,1,2)) # need (b*L,32,H,W) instead of (b*L,H,W,32)
        return out_unet, norm_plane

# 3] Decoder

In [18]:
# After the Unet, I have L dynamic planes each with feature map H*W of dimension 32 for each batch
# Before using the real decoder network I need to follows these steps:
# > we need to project any point of a cloud onto all dynamic planes
# > use bilinear interpolation of the feature encoded at the four neighboring plane grids (smooth)
# > concatenate the L features obtained
# > obtain occupancy prediction from the decoder
class Decoder(nn.Module):
    """
        This class is used to define the Decoder Network. The architecture is composed by 5 ResNet blocks with hidden dimension 32 followed by a small
        Fully Connected netowrk that returns the Occupancy prediction.
        Given the features vector in input, we have to perform Bilinear Interpolation before the ResNet blocks.
        Architecture design:

            @ INPUT:
                > Tensor of shape [features vector] (batch_size, num_points, features=32*L)


            @ OUTPUT:
                > Tensor of shape [occupancy prediction] (batch_size, n_points=2048, 1)
    """

    def __init__(self, in_dim=32*NUM_PLANES, n_points=2048, n_blocks=5):
        super(Decoder, self).__init__()

        self.res_blocks = nn.ModuleList(
            [ResBlock(in_dim, n_points, in_dim, in_dim) for i in range(n_blocks)]
        )
        self.occupancy_pred = nn.Sequential(
            nn.Linear(in_dim, in_dim//2),
            nn.ReLU(),
            nn.Linear(in_dim//2, 1),
            nn.Sigmoid(), # choose if you directly apply bce with logits or not
        )

    def forward(self, x):

        for res_block in self.res_blocks:
            x = res_block(x) # check if the addition of the feature vector is correct

        occupancy = self.occupancy_pred(x)
        # print(f"Decoder : ({x.shape}) --> ({occupancy.shape}) \n")

        return occupancy

# 4] Metrics

<font color="orange"> Chamster Distance: </font> $
CD(A, B) = \frac{1}{|A|} \sum_{a \in A} \min_{b \in B} \|a - b\|_2^2 + \frac{1}{|B|} \sum_{b \in B} \min_{a \in A} \|b - a\|_2^2
$

In [None]:
# Chamster Distance
def ChamsterDistance(input_cloud, pred_cloud):
    """
        This function is used to compute the Chamster Distance between the input cloud and the predicted cloud.
        This metrics is composed by sum of the distances from each point in input_cloud to its nearest neighbor in pred_cloud
        plus the distances from each point in pred_cloud to its nearest neighbor in input_cloud.
        We would like to MINIMIZE this metric.

        @ INPUT :
            > tensor of shape (batch_size, num_points, 3)
            > tensor of shape (batch_size, num_points, 3)

        @ OUTPUT :
            > tensor of shape (batch_size, num_points, 1)
    """

    A = input_cloud.detach().cpu()
    B = pred_cloud.detach().cpu()

    dists_A_to_B = torch.cdist(A, B, p=2)
    dists_B_to_A = torch.cdist(B, A, p=2)

    print(f"dists_A_to_B : ({dists_A_to_B.shape})")
    print(f"dists_B_to_A : ({dists_B_to_A.shape})")

    min_A_to_B = torch.min(dists_A_to_B, dim=1)[0]
    min_B_to_A = torch.min(dists_B_to_A, dim=1)[0]

    print(f"min_A_to_B : ({min_A_to_B.shape})")
    print(f"min_B_to_A : ({min_B_to_A.shape})")

    CD = torch.mean(min_A_to_B) + torch.mean(min_B_to_A)

    print(f"Chamster Distance : ({CD.shape}) \n")

    return CD

<font color="orange"> Volumetric IOU: </font> $ IoU(A', B') = \frac{|A' \cap B'|}{|A' \cup B'|}$

In [None]:
# Intersection Over Union
def VolumetricIOU(real_cloud, pred_cloud):
    """
        This function is used to compute the Intersection Over Union between two volumes

        @ INPUT :
            > tensor of shape (batch_size, ...) (actually binary voxel grid)
            > tensor of shape (batch_size, ...) (actually binary voxel grid)

        @ OUTPUT :
            > float
    """
    real_cloud = np.array(real_cloud, dtype=np.bool)
    pred_cloud = np.array(pred_cloud, dtype=np.bool)

    return np.sum(real_cloud & pred_cloud) / np.sum(pred_cloud) + np.sum(real_cloud)

<font color="orange"> F-Score: </font>

In [None]:
# F-score

<font color="orange"> Normal Consistency: </font>

In [None]:
# Normal Consistency

# 5] Training

In [63]:
class CompleteArchitecture(nn.Module):

    def __init__(self, in_dim=64, out_dim=32, n_points=2048, n_blocks=5, num_planes=4, num_fc=4):
        super(CompleteArchitecture, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()

    def forward(self, x):
        out_enc, proj_plane = self.encoder(x)
        interpol_feat = F.grid_sample(out_enc, proj_plane.unsqueeze(1), mode='bilinear', align_corners=True)
        interpol_feat = interpol_feat.squeeze(2).permute(0,2,1).view(BATCH_SIZE, NUM_PLANES, x.shape[1], interpol_feat.shape[1])
        concat_feat = interpol_feat.reshape(BATCH_SIZE, -1, NUM_PLANES*interpol_feat.shape[-1])
        occupancy = self.decoder(concat_feat)
        return occupancy

In [15]:
# I use as input a noisy point cloud (point cloud + sampled noise with zero mean and stdev 0.05)
# Uniformly sampled cloud is used as the occupancy supervision aka my label -> use this to found the real occupancy value !
# Training with BCE
# Inference -> Multiresolution Isosurface Extraction used to construct meshes starting from the occupancy grid

In [20]:
# MISE:
# discretize volumetric space at initial resolution and mark as occupied the grid point with occupancy > threshold
# if at least 2 adjacent grid points have differing occupancy predictions (this will be the intersecting voxels)
# subdivide 8 subvoxels and evaluate them as before (continue this until you have reached the correct resolution)
# use marching cubes
# simplify mesh with Fast Quadric Mesh Simplification Algorithm
# refine output with 1st or 2nd order gradient information

In [155]:
# starting from the sampled point cloud, I need to extract the occupancy prediction label
def Cloud2Voxel(cloud, size=0.05):
    # assumed to be a np.array
    point_cloud = toPointCloud(cloud)
    voxel = o3d.geometry.VoxelGrid.create_from_point_cloud(point_cloud, voxel_size=size)
    return voxel

def addNoise(cloud, std=0.05):
    noisy_cloud = cloud + np.random.normal(0, std, cloud.shape)
    return noisy_cloud.to(torch.float32).to(device)

def addUniformSamples(cloud):
    return 0

def Cloud2Mesh(cloud):
    point_cloud = toPointCloud(cloud)
    mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_ball_pivoting(point_cloud, o3d.utility.DoubleVector([0.005, 0.01, 0.02, 0.1, 0.5]))
    # mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(point_cloud, depth=9)
    # densities = np.asarray(densities)
    # density_threshold = np.percentile(densities, 5)
    # vertices_to_keep = densities > density_threshold
    # mesh = mesh.select_by_index(np.where(vertices_to_keep)[0])
    # mesh.remove_non_manifold_edges()
    # mesh.remove_degenerate_triangles()
    # mesh.remove_duplicated_triangles()
    # mesh.remove_duplicated_vertices()
    return mesh

def toPointCloud(cloud):
    point_cloud = o3d.geometry.PointCloud()
    point_cloud.points = o3d.utility.Vector3dVector(cloud)
    point_cloud.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.05, max_nn=30))
    point_cloud.orient_normals_consistent_tangent_plane(k=30)

    return point_cloud

In [160]:
mesh = o3d.io.read_triangle_mesh("/content/drive/MyDrive/CV/MPI-FAUST/training/registrations/tr_reg_010.ply")
o3d.visualization.draw_plotly([mesh])

In [156]:
completeModel = CompleteArchitecture()
completeModel = completeModel.to(device)
BCE = nn.BCELoss()
for batch in train_loader:
    occupancy_supervision, registration = batch
    sampled_cloud = occupancy_supervision.to(device) # (B, P, 3)
    noisy_cloud = addNoise(occupancy_supervision)# that's my actual input to the net (B, P, 3)
    # Plot2D(noisy_cloud[0].cpu().numpy())
    # Plot2D(sampled_cloud[0].cpu().numpy())
    occupancy_pred = completeModel(noisy_cloud) # (B, P, 1)

    o3d.visualization.draw_plotly([toPointCloud(registration[0].cpu().numpy())])
    o3d.visualization.draw_plotly([Cloud2Mesh(registration[0].cpu().numpy())])
    break

In [None]:
completeModel = CompleteArchitecture()
completeModel = completeModel.to(device)
BCE = nn.BCELoss() # later change this and use the version with logits
optimizer = torch.optim.Adam(completeModel.parameters(), lr=1e-4) # currently using the same from the paper
num_epochs = 25

for epoch in range(num_epochs):

    completeModel.train()

    train_loss = 0.0

    train_loss_cnt = []
    val_loss_cnt = []

    # add other metrics container (IOU, chamfer, ...)
    for batch in train_loader:
        optimizer.zero_grad()
        occupancy_supervision, _ = batch

        sampled_cloud = occupancy_supervision.to(device) # later used as occupancy supervision
        noisy_cloud = occupancy_supervision + torch.random.normal(0, 0.05, occupancy_supervision.shape).to(device) # that's my actual input

        occupancy_pred = completeModel(noisy_cloud)
        # real_occupancy = do something with the occupancy supervision

        loss = BCE(occupancy_pred, real_occupancy)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)
    train_loss_cnt.append(avg_train_loss)


    completeModel.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch in val_loader:
            occupancy_supervision, _ = batch

            sampled_cloud = occupancy_supervision.to(device) # later used as occupancy supervision
            noisy_cloud = occupancy_supervision + torch.random.normal(0, 0.05, occupancy_supervision.shape).to(device) # that's my actual input

            occupancy_pred = completeModel(noisy_cloud)
            # real_occupancy = do something with the occupancy supervision

            loss = BCE(occupancy_pred, real_occupancy)

            val_loss += loss.item()
            # perform the reconstruction
            # meshes = Reconstruction(occupancy_pred, sampled_cloud)
            # compute other metrics

    avg_val_loss = val_loss / len(val_loader)
    val_loss_cnt.append(avg_val_loss)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')

    # save the model if it's better than the last one

    """
    torch.save(
        {
        'model_state_dict': completeModel.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        },
        'model.pt'
    )
    """


In [None]:
plt.figure(figsize=(10, 5))
plt.plot(train_loss_cnt, label='Train')
plt.plot(val_loss_cnt, label='Val')
plt.xlabel('Epochs')
plt.ylabel('Loss Value')
plt.title('Training and Validation Losses')
plt.grid(True)
plt.legend()
plt.show()
plt.savefig('losses.png')

# 6] Evaluation

We first discretize the volumetric space at an initial resolution and evaluate the occupancy network fθ (p, x) for all p in this grid.
We mark all grid points p as <font color="orange"> occupied </font> for which fθ(p,x) is bigger or equal to some threshold τ. Next, we mark all voxels as <font color="olive"> active </font> for which **at least
two adjacent grid points** have differing occupancy predictions. These are the voxels which would intersect the mesh with the marching cubes algorithm
at the current resolution. We subdivide **all active voxels into**<font color="orange"> 8 </font>**subvoxels** and evaluate all new grid points which are introduced to the occupancy grid through
this subdivision. We repeat these steps until the desired final resolution is reached.
At this final resolution, we apply the Marching Cubes algorithm to extract an approximate isosurface :
{p ∈ R3 | fθ(p,x) = τ}.
Our algorithm converges to the correct mesh if the occupancy grid at the initial resolution contains points from every connected component of both the
interior and the exterior of the mesh. It is hence important to take an initial resolution which is high enough to satisfy this condition.
In practice, we found that an initial resolution of $32^3$ was sufficient in almost all cases.
The initial mesh extracted by the Marching Cubes algorithm can be further refined. In a first step, we simplify the mesh using the
Fast-Quadric-Mesh-Simplification algorithm. Finally, we refine the output mesh using first and second order (i.e., gradient) information.
Towards this goal, we sample random points pk from each face of the output mesh and minimize the loss.

In [None]:
def plotMesh(vertices, faces):
    """
    This function is used to plot a 3D Mesh.

    Args:
        vertices (np.array): Array of mesh vertices.
        faces (np.array): Array of mesh faces (triangles).

    Returns:
        Plot
    """
    i = []
    j = []
    k = []
    for face in faces:
        i.append(face[0])
        j.append(face[1])
        k.append(face[2])
    fig = go.Figure(data=[go.Mesh3d(
        x=vertices[:, 0],
        y=vertices[:, 1],
        z=vertices[:, 2],
        i=i,
        j=j,
        k=k,
        opacity=0.8,
        color='lightblue'
    )])
    fig.update_layout(
        scene=dict(
            xaxis=dict(visible=True),
            yaxis=dict(visible=True),
            zaxis=dict(visible=True),
        ),
        margin=dict(l=0, r=0, b=0, t=0)
    )

    fig.show()