# Full Modelnet40

This script preprocesses the aligned ModelNet40 dataset from [ORION](https://github.com/lmb-freiburg/orion) and produces a new dataset that is compatible for training PAPNet with non-partial dataset.

In [None]:
import open3d as o3d
import numpy as np
import os
from tqdm import tqdm

%load_ext autoreload
%autoreload 2

In [None]:
def visualize_3d(
    sample: np.array,
    show_axes: bool = True
):
    """Visualizes point clouds with normals in 3D plot.

    Args:
        sample (np.array): (points, 6) where points=point clouds,
            6 = 3 dimensions + 3 normals
        i (int): n index of sample to visualize
    """
    points = sample[:, :3]
    normals = sample[:, 3:]

    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    pcd.normals = o3d.utility.Vector3dVector(normals)

    if show_axes:
        Markers = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.05, origin=[0, 0, 0])
        o3d.visualization.draw_geometries([pcd, Markers], width=800, height=600)
    else:
        o3d.visualization.draw_geometries([pcd], width=800, height=600)

In [None]:
def off_to_point_cloud(file_path:str, num_points:int=1024) -> np.ndarray:
    """Load a 3D mesh from an .off file and sample points uniformly from its surface.

    Args:
        file_path (str): Path to the .off file.
        num_points (int): Number of points to sample from the mesh surface.
    Returns:
        np.ndarray: Array of shape (num_points, 6) containing the sampled point cloud, with XYZ coordinates and normals.
    """
    mesh = o3d.io.read_triangle_mesh(file_path)
    mesh.compute_vertex_normals()
    pcd = mesh.sample_points_uniformly(number_of_points=num_points)
    points = np.asarray(pcd.points)
    normals = np.asarray(pcd.normals)
    return np.hstack((points, normals))

def pc_normalize(pc:np.ndarray) -> np.ndarray:
    """Normalize a point cloud to be centered at the origin and fit within a unit sphere.

    Args:
        pc (np.ndarray): Array of shape (num_points, 6) containing the point cloud.
    Returns:
        np.ndarray: Normalized point cloud of shape (num_points, 6).
    """
    centroid = np.mean(pc[:, :3], axis=0)
    pc[:, :3] -= centroid
    max_dist = np.max(np.sqrt(np.sum(pc[:, :3] ** 2, axis=1)))
    pc[:, :3] /= max_dist
    return pc

def batch_create_pointclouds_from_dir(base_path:str, num_points:int=1024) -> np.ndarray:
    """Create a numpy array of point clouds from .off files in the specified directory.

        Args:
        base_path (str): Directory containing .off files.
        num_points (int): Number of points to sample from each mesh surface.
    Returns:
        np.ndarray: Array of shape (N, num_points, 6) containing N point clouds.
        list[str]: List of original filenames corresponding to each point cloud.
    """
    off_files = sorted([f for f in os.listdir(base_path) if f.endswith('.off')])
    point_clouds = np.zeros((len(off_files), num_points, 6), dtype=np.float32)

    pbar = tqdm(total=len(off_files), desc=f"{base_path}..")

    for i, off_file in enumerate(off_files):
        pbar.update(1)
        file_path = os.path.join(base_path, off_file)
        try:
            pc = off_to_point_cloud(file_path, num_points)
            pc = pc_normalize(pc)
            point_clouds[i] = pc
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            continue
    pbar.close()
        
    return point_clouds, [f[:-4] for f in off_files]  # return filenames without .off extension

def generate_random_translation_vector_normal(length, translation_u_stddev: list[tuple[float, float]] = [(0, 0.1), (0, 0.1), (0, 0.1)]) -> np.ndarray:
    """Generate a random translation vector sampled from normal distributions for each axis.

    Args:
        translation_u_stddev (list[tuple[float, float]]): List of 3 tuples specifying the mean and stddev for each axis (x, y, z).
    Returns:
        np.ndarray: Array of shape (length, 3) containing translation vectors.
    """
    if length < 1:
        raise ValueError("length must be >= 1")
    
    translations = np.empty((length, 3), dtype=np.float64)
    for i in range(3):
        mean, stddev = translation_u_stddev[i]
        translations[:, i] = np.random.normal(mean, stddev, length)
    
    return translations

def generate_random_translation_vector(length, translation_range: list[tuple[float, float]] = [(-1, 1), (-1, 1), (-1, 1)]) -> np.ndarray:
    """Generate a random translation vector within specified ranges for each axis.

    Args:
        translation_range (list[tuple[float, float]]): List of 3 tuples specifying the min and max translation for each axis (x, y, z).
    Returns:
        np.ndarray: Array of shape (length, 3) containing translation vectors.
    """
    if length < 1:
        raise ValueError("length must be >= 1")
    
    translations = np.empty((length, 3), dtype=np.float64)
    for i in range(3):
        min_val, max_val = translation_range[i]
        translations[:, i] = np.random.uniform(min_val, max_val, length)
    
    return translations

def generate_random_rotation_matrix(length: int = 1) -> np.ndarray:
    """Generate random rotation matrices uniformly sampled from SO(3).

    Shoemake (1992) method is used to sample uniform quaternions, which are then converted to rotation matrices.

    Args:
        length (int): Number of rotation matrices to generate.
    Returns:
        np.ndarray: Array of shape (length, 3, 3) containing rotation matrices.
    """
    if length < 1:
        raise ValueError("length must be >= 1")

    # Shoemake (1992) method: sample uniform quaternions
    u1 = np.random.rand(length)
    u2 = np.random.rand(length)
    u3 = np.random.rand(length)

    q1 = np.sqrt(1.0 - u1) * np.sin(2.0 * np.pi * u2)
    q2 = np.sqrt(1.0 - u1) * np.cos(2.0 * np.pi * u2)
    q3 = np.sqrt(u1)        * np.sin(2.0 * np.pi * u3)
    q4 = np.sqrt(u1)        * np.cos(2.0 * np.pi * u3)

    # Interpret as quaternion (w, x, y, z)
    x = q1
    y = q2
    z = q3
    w = q4

    xx = x * x
    yy = y * y
    zz = z * z
    xy = x * y
    xz = x * z
    yz = y * z
    wx = w * x
    wy = w * y
    wz = w * z

    rotations = np.empty((length, 3, 3), dtype=np.float64)

    rotations[:, 0, 0] = 1.0 - 2.0 * (yy + zz)
    rotations[:, 0, 1] = 2.0 * (xy - wz)
    rotations[:, 0, 2] = 2.0 * (xz + wy)

    rotations[:, 1, 0] = 2.0 * (xy + wz)
    rotations[:, 1, 1] = 1.0 - 2.0 * (xx + zz)
    rotations[:, 1, 2] = 2.0 * (yz - wx)

    rotations[:, 2, 0] = 2.0 * (xz - wy)
    rotations[:, 2, 1] = 2.0 * (yz + wx)
    rotations[:, 2, 2] = 1.0 - 2.0 * (xx + yy)

    return rotations

def create_random_rot_and_trans(points:np.ndarray,
                                translation_param: list[tuple[float, float]]) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Apply random rotation matrices and translation vectors for a batch of point clouds.
    The translation vectors are sampled from normal distributions specified by translation_param (mean, stddev) for each axis.
    The rotation matrices are sampled uniformly from SO(3).

    Args:
        points (np.ndarray): Array of shape (N, num_points, 6) containing N point clouds.
        translation_param (list[tuple[float, float]]): List of 3 tuples specifying the mean and stddev for each axis (x, y, z).
    Returns:
        tuple[np.ndarray, np.ndarray, np.ndarray]: 
            - Array of shape (N, num_points, 6) containing the transformed point clouds.
            - Array of shape (N, 3) containing translation vectors.
            - Array of shape (N, 3, 3) containing rotation matrices.
    """
    batch_size = points.shape[0]
    
    # Initialize arrays to store transformations
    translations = generate_random_translation_vector_normal(batch_size, translation_param)    
    rotations = generate_random_rotation_matrix(batch_size)
    
    # Apply rotation and translation using batch matrix multiplication
    points[:, :, :3] = points[:, :, :3] @ rotations.transpose(0, 2, 1) + translations[:, np.newaxis, :]

    return points, translations, rotations

def get_pc_stats(pc: np.array):
    """Get point cloud statistics per dimension.

    Args:
        pc (np.array): (points, 3) point cloud data

    Returns:
        dict: Dictionary with mean, min, max, stddev of point cloud
    """
    stats = {}
    stats['mean'] = pc.mean(axis=0)
    stats['min'] = pc.min(axis=0)
    stats['max'] = pc.max(axis=0)
    stats['stddev'] = pc.std(axis=0)
    return stats

### Tests for the functions

In [None]:
def test_off_to_point_cloud():
    base_path = "../datasets/modelnet40_auto_aligned/airplane/train/airplane_0001.off"
    pc = off_to_point_cloud(base_path, num_points=1024)
    assert pc.shape == (1024, 6), f"Expected shape (1024, 6), got {pc.shape}"
    print("off_to_point_cloud test passed.")
    visualize_3d(pc, show_axes=True)

def test_batch_create_pointclouds_from_dir():
    base_path = "../datasets/modelnet40_auto_aligned/airplane/test/"
    pcs, filenames = batch_create_pointclouds_from_dir(base_path, num_points=1024)
    assert pcs.shape[1:] == (1024, 6), f"Expected shape (_, 1024, 6), got {pcs.shape}"
    assert len(filenames) == pcs.shape[0], f"Expected {pcs.shape[0]} filenames, got {len(filenames)}"
    print("batch_create_pointclouds_from_dir test passed.")
    visualize_3d(pcs[0], show_axes=True)

def test_generate_random_translation_vector():
    translations = generate_random_translation_vector(5, [(-1, 1), (-2, 2), (-3, 3)])
    assert translations.shape == (5, 3), f"Expected shape (5, 3), got {translations.shape}"
    for i in range(3):
        min_val, max_val = [(-1, 1), (-2, 2), (-3, 3)][i]
        assert np.all(translations[:, i] >= min_val) and np.all(translations[:, i] <= max_val), f"Translations out of range for axis {i}"
    print("generate_random_translation_vector test passed.")

def test_generate_random_rotation_matrix():
    rotations = generate_random_rotation_matrix(5)
    assert rotations.shape == (5, 3, 3), f"Expected shape (5, 3, 3), got {rotations.shape}"
    for i in range(5):
        rt = rotations[i].T @ rotations[i]
        identity = np.eye(3)
        assert np.allclose(rt, identity), f"Rotation matrix {i} is not orthogonal."
        det = np.linalg.det(rotations[i])
        assert np.isclose(det, 1.0), f"Rotation matrix {i} does not have determinant 1."
    print("generate_random_rotation_matrix test passed.")

def test_generate_random_rot_and_trans():
    n_objs = 5
    n_points = 1024
    pc = np.random.rand(n_objs, n_points, 6)
    translation_param = [(0, 0.1), (0, 0.1), (0, 0.1)]
    pc_transformed, translations, rotations = create_random_rot_and_trans(pc, translation_param)
    assert pc_transformed.shape == (n_objs, n_points, 6), f"Expected shape ({n_objs}, {n_points}, 6), got {pc_transformed.shape}"
    assert translations.shape == (n_objs, 3), f"Expected shape ({n_objs}, 3), got {translations.shape}"
    assert rotations.shape == (n_objs, 3, 3), f"Expected shape ({n_objs}, 3, 3), got {rotations.shape}"
    # Verify that each rotation matrix is valid
    for i in range(n_objs):
        rt = rotations[i].T @ rotations[i]
        identity = np.eye(3)
        assert np.allclose(rt, identity), f"Rotation matrix {i} is not orthogonal."
        det = np.linalg.det(rotations[i])
        assert np.isclose(det, 1.0), f"Rotation matrix {i} does not have determinant 1."
    print("generate_random_rot_and_trans test passed.")

# Uncomment to run tests
# test_off_to_point_cloud()
# test_batch_create_pointclouds_from_dir()
# test_generate_random_translation_vector()
# test_generate_random_rotation_matrix()
# test_generate_random_rot_and_trans()

In [None]:
# Pipeline to make a dataset from the manually aligned ModelNet40, sampled using sample_points_uniformly, and normalized using max sum of squares (in pc_normalize)

def create_pc_dataset_from_modelnet40(base_path:str, split:str="train",
                              num_points:int=1024, num_augmentations=1,
                              translation_param: list[tuple[float, float]] = [(0, 0.1), (0, 0.1), (0, 0.1)]) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Create a normalized dataset from the manually aligned ModelNet40 dataset.

    The pipeline is as follows:
    1. Load .off files from the specified base directory.
    2. Sample points uniformly from each mesh surface.
    3. Normalize each point cloud to be centered at the origin and fit within a unit sphere.
    4. Apply random rotation and translation to each point cloud.
        -> for "train" split, create 10 random augmentations per sample
        -> for "test" split, create 1 augmentation per sample
    5. Assign integer labels based on category subdirectory names (sorted alphabetically).

    Args:
        base_path (str): Base directory containing category subdirectories with .off files, organized as follows
            ```
            base_path/category1/train/*.off
            base_path/category1/test/*.off
            base_path/category2/train/*.off
            base_path/category2/test/*.off
            ...
            ```
        split (str): Dataset split to use ("train" or "test").
        num_augmentations (int): Number of augmentations to create per sample.
        num_points (int): Number of points to sample from each mesh surface.
        translation_param (list[tuple[float, float]]): List of 3 tuples specifying the mean and stddev for each axis (x, y, z).
    Returns:
        tuple[np.ndarray, np.ndarray, list[str]]:
            - Array of shape (N, num_points, 6) containing N normalized point clouds.
            - Array of shape (N, 3) containing translation vectors.
            - Array of shape (N, 3, 3) containing rotation matrices.
            - Array of shape (N,) containing integer labels for each point cloud.
            - list of original filenames corresponding to each point cloud.
    """
    categories = sorted([d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))])
        
    all_point_clouds = []
    all_translations = []
    all_rotations = []
    all_labels = []
    all_filenames = []
    
    for cat_idx, category in enumerate(categories):
        cat_split_path = os.path.join(base_path, category, split)
        
        if not os.path.exists(cat_split_path):
            print(f"Warning: {cat_split_path} does not exist, skipping...")
            continue
        
        # Load and normalize point clouds for this category
        pcs, filenames = batch_create_pointclouds_from_dir(cat_split_path, num_points)
        
        if len(pcs) == 0:
            continue
        
        # For each point cloud, create num_augmentations copies
        pcs_aug = np.repeat(pcs, num_augmentations, axis=0)

        pcs_transformed, translations, rotations = create_random_rot_and_trans(pcs_aug, translation_param)
        all_point_clouds.append(pcs_transformed)
        all_translations.append(translations)
        all_rotations.append(rotations)
        all_labels.extend([cat_idx] * len(pcs_transformed))
        if num_augmentations > 1:
            all_filenames.extend([f"{fname}_{i}" for fname in filenames for i in range(num_augmentations)])
        else:
            all_filenames.extend(filenames)

    
    # Concatenate all results
    all_point_clouds = np.concatenate(all_point_clouds, axis=0)
    all_translations = np.concatenate(all_translations, axis=0)
    all_rotations = np.concatenate(all_rotations, axis=0)
    all_labels = np.array(all_labels)
    
    return all_point_clouds, all_translations, all_rotations, all_labels, all_filenames

### Create the fullmodelnet40 dataset from `modelnet40_auto_aligned`

In [None]:
# Test the dataset creation pipeline
base_path = "../datasets/modelnet40_auto_aligned/"
out_path = "../datasets/fullmodelnet40/"

os.makedirs(out_path, exist_ok=True)

# The stats are from the partialnet40 dataset in the PAPNet paper
# used so that the datasets are comparable
mean_trans = [0.00299775, 0.00311312, 0.0374197]
std_trans = [0.02494127, 0.02498563, 0.03475482]
translation_param = [(mean_trans[i], std_trans[i]) for i in range(3)]

# Train
point_clouds, translations, rotations, labels, filenames = create_pc_dataset_from_modelnet40(base_path, split="train", num_augmentations=10, num_points=1024, translation_param=translation_param)
print(f"Created train dataset with {point_clouds.shape[0]} samples.")
print("point_clouds.shape:", point_clouds.shape)
print("translations.shape:", translations.shape)
print("rotations.shape:", rotations.shape)
print("labels.shape:", labels.shape)
# write to .npy files
np.save(os.path.join(out_path, "train_points.npy"), point_clouds)
np.save(os.path.join(out_path, "train_gt_tra.npy"), translations)
np.save(os.path.join(out_path, "train_gt_rot.npy"), rotations)
np.save(os.path.join(out_path, "train_labels.npy"), labels)
# save filenames as text file
with open(os.path.join(out_path, "train_filenames.txt"), "w") as f:
    for filename in filenames:
        f.write(f"{filename}\n")

# Test
point_clouds, translations, rotations, labels, filenames = create_pc_dataset_from_modelnet40(base_path, split="test", num_augmentations=1, num_points=1024, translation_param=translation_param)
print(f"Created test dataset with {point_clouds.shape[0]} samples.")
print("point_clouds.shape:", point_clouds.shape)
print("translations.shape:", translations.shape)
print("rotations.shape:", rotations.shape)
print("labels.shape:", labels.shape)
# write to .npy files
np.save(os.path.join(out_path, "test_points.npy"), point_clouds)
np.save(os.path.join(out_path, "test_gt_tra.npy"), translations)
np.save(os.path.join(out_path, "test_gt_rot.npy"), rotations)
np.save(os.path.join(out_path, "test_labels.npy"), labels)
# save filenames as text file
with open(os.path.join(out_path, "test_filenames.txt"), "w") as f:
    for filename in filenames:
        f.write(f"{filename}\n")

### "Unrotate" and "untranslate" a sample


In [None]:

i = 952
obj1 = point_clouds[i, :, :3]
tra = translations[i]
rot = rotations[i]
obj1_canonical = (obj1 - tra) @ rot
obj1_final = np.concatenate([obj1_canonical, point_clouds[i, :, 3:]], axis=1)

stats = get_pc_stats(point_clouds[0][:, :3])
print(stats)

visualize_3d(point_clouds[i], show_axes=True)
visualize_3d(obj1_final, show_axes=True)