In [None]:
"""Notebok loads & explores the dataset.
"""

In [None]:
from config import config
from data_processing.utils import viewer
import pathlib
import trimesh
import random
import numpy as np
import math

In [None]:
SEED = 42
np.random.seed(SEED)

# Load the dataset

In [None]:
# Load global config.
config_file = config.Config()

In [None]:
dataset_train_path = pathlib.Path(config_file.config['dataset']['train'])
dataset_test_path = pathlib.Path(config_file.config['dataset']['test'])

In [None]:
# Create a dict with class names & their indices.
folders = [dir.stem for dir in sorted(dataset_train_path.iterdir()) if dir.is_dir()]
classes = {folder: i for i, folder in enumerate(folders)};

In [None]:
classes

# Visualize an example file

In [None]:
example_path = dataset_train_path / 'motor/00001173.obj'

In [None]:
example = trimesh.load(example_path, force='mesh')

In [None]:
viz = viewer.Viewer()
viz.add_mesh(example)
viz.add_pc(example.vertices, size=2.5)
viz.show()

# Normalize & augment the data.

In [None]:
# N.B. All the augmentation functions had been written as Pytorch Dataset transformations!
# Source: https://pytorch.org/tutorials/beginner/data_loading_tutorial.html

In [None]:
# Add extra points to the point clouds.
# This step is done as the mesh vertices alone do not give enough spatial information.

In [None]:
class SamplePc(object):
    def __init__(self, points: int = 1024):
        """
        Args:
            points: Number of points to sample from the mesh.
        """
        self.points = points
    def __call__(self, mesh: trimesh.Trimesh):
        """Samples the point cloud from the mesh surface.
        
        Args:
            mesh: Input mesh.
        Returns:
            Sampled point cloud.
        """
        pc_sampled, _ = trimesh.sample.sample_surface(mesh, self.points, seed=SEED)
    
        return pc_sampled

In [None]:
pc = SamplePc()(example)

In [None]:
viz = viewer.Viewer()
viz.add_pc(pc, size=2.5)
viz.show()

In [None]:
class NormalizePc(object):
    def __call__(self, pc:np.ndarray)-> np.ndarray:
        # TODO(vice) Check the paper for name of the normalization
        """Normalizes point cloud.
        
        Args:
            pc: Input point cloud.
        
        Returns:
            Normalized point cloud.
        """
        if len(pc.shape) != 2:
            print('Invalid point cloud!')
            return np.array([])
        pc_norm = pc - np.mean(pc, axis=0)
        pc_norm /= np.max(np.linalg.norm(pc_norm, axis=1))
        
        return pc_norm

In [None]:
pc_norm = NormalizePc()(pc)

In [None]:
viz = viewer.Viewer()
viz.add_pc(pc_norm, size=2.5)
viz.show()

In [None]:
class ApplyRandomRotationZ(object):
    def __call__(self, pc:np.ndarray) -> np.ndarray:
        """Applies random rotation around the z axis
        to the input point cloud.
    
        Args:
            pc: Input point cloud.
    
        Returns:
            Rotated point cloud.
        """
        theta = np.random.random(1) * 2 * math.pi
        rot_matrix = np.array([[math.cos(theta), -math.sin(theta), 0],
                               [math.sin(theta), math.cos(theta), 0],
                               [0, 0, 1]])
          
        pc_rot = rot_matrix.dot(pc.T).T
    
        return pc_rot

In [None]:
pc_rot = ApplyRandomRotationZ()(pc_norm)

In [None]:
viz = viewer.Viewer()
viz.add_pc(pc_rot, size=2.5)
viz.show()

In [None]:
class AddJitter(object):
    def __call__(self, pc:np.ndarray) -> np.ndarray:
        """Applied random jitter to the point cloud.
    
        Args:
            pc: Input point cloud.
    
        Returns:
            Point cloud with added noise.
        """
        jitter = np.random.normal(0, 0.02, (pc.shape))
        pc_noisy = pc + jitter
        
        return pc_noisy

In [None]:
pc_noisy = AddJitter()(pc_rot)

In [None]:
viz = viewer.Viewer()
viz.add_pc(pc_noisy, size=2.5)
viz.show()

In [None]:
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from typing import Union
from typing import Tuple

In [None]:
default_transforms = transforms.Compose([
                                SamplePc(1024),
                                NormalizePc()
                               ])

In [None]:
# 1024 points per cloud as in the paper!
train_transforms = transforms.Compose([
                              SamplePc(1024),
                              NormalizePc(),
                              ApplyRandomRotationZ(),
                              AddJitter()
                            ])

In [None]:
class McbData(Dataset):
    def __init__(self, dataset_dir: Union[pathlib.Path, str],
                 transforms = default_transforms):
        """Loads MCB dataset...
        
        Args:
            dataset_dir: Input directory.
        """
        self.dataset_dir = pathlib.Path(dataset_dir)
        # Create a dict with class names & their indices.
        folders = [dir.stem for dir in sorted(dataset_train_path.iterdir()) if dir.is_dir()]
        self.classes = {folder: i for i, folder in enumerate(folders)};
        
        self.transforms = transforms
        # Load all the samples paths and their category idx.
        self.samples = []
        for category, category_idx in self.classes.items():
            cat_dir = self.dataset_dir/pathlib.Path(category)
            for mesh in cat_dir.iterdir():
                if mesh.is_file() and mesh.suffix == '.obj':
                    sample = {}
                    sample['mesh_path'] = mesh.absolute()
                    sample['category_idx'] = category_idx
                    self.samples.append(sample)

    def __len__(self):
        return len(self.files)

    def __preproc__(self, file):
        verts, faces = read_off(file)
        if self.transforms:
            pointcloud = self.transforms((verts, faces))
        return pointcloud

    def __getitem__(self, idx: int) -> Tuple[torch.tensor, torch.tensor]:
        """Returns an single entry of the dataset which consists of input point
        cloud and the output category.
    
        Args:
            idx: Index of the entry.
    
        Returns:
            Tuple containing dataset entry.
        """
        pcd_path = self.files[idx]['pcd_path']
        category = self.files[idx]['category']
        with open(pcd_path, 'r') as f:
            pointcloud = self.__preproc__(f)
        return {'pointcloud': pointcloud, 
               'category': self.classes[category]}

In [None]:
McbData(dataset_train_path)