# Model Training Notebook

## Description

This notebook serves as an small example to how we trained and tested our networks.
In progress of the project multiple models were tested, this only serves as one example.

## Imports

In [1]:
import numpy as np
import open3d as o3d

import os
import random
import copy

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import random_split
from torch.utils.data import Dataset, DataLoader

from tqdm import tqdm

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
# Use CUDA if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Helper functions

These are some helper functions and are mostly the same functions like in the data_creation.ipynb notebook.

To keep this notebook short we left out the documentation, since the functions hopefully self explanatory.

In [3]:
# ==================================================================================================
#  Load Point Cloud
# ==================================================================================================
def load_pcd(file_path):
    return o3d.io.read_point_cloud(file_path)

# ==================================================================================================
#  Generate Rotated View
# ==================================================================================================
def generate_rotated_view(pcd, rotation_angle_deg):
    rotation_angle_rad = np.deg2rad(rotation_angle_deg)
    R = pcd.get_rotation_matrix_from_xyz((0, rotation_angle_rad, 0))
    rot_copy = copy.deepcopy(pcd).rotate(R, center=[0,0,0])
    bound_difference = np.asarray(rot_copy.get_max_bound()) - np.asarray(rot_copy.get_min_bound())
    diameter = np.linalg.norm(bound_difference)
    c = rot_copy.get_center()
    camera = o3d.core.Tensor([c[0], c[1], diameter], o3d.core.float64)
    radius = diameter * 100
    tensor_pcd = o3d.t.geometry.PointCloud.from_legacy(rot_copy)
    _, pt_map = tensor_pcd.hidden_point_removal(camera, radius)
    tensor_pcd = tensor_pcd.select_by_index(pt_map)
    pcd_cut = tensor_pcd.to_legacy()
    return rot_copy, pcd_cut

# ==================================================================================================
#  Normalize
# ==================================================================================================
def normalize(pcd):
    bounding_box = pcd.get_axis_aligned_bounding_box()
    center = bounding_box.get_center()
    pcd.translate(-center)

    min_bound = pcd.get_min_bound()
    max_bound = pcd.get_max_bound()

    extents = max_bound - min_bound
    scale_factors = []
    for d in extents:
        scale_factors.append(2.0 / d)

    scale_factors = np.array(scale_factors)

    scale_full = scale_factors * (31.5, 63.5, 31.5)
    points = np.asarray(pcd.points)
    points = points * scale_full
    points = points + (32,64,32)
    pcd.points = o3d.utility.Vector3dVector(np.around(points, decimals=4))

    return pcd

## Dataloader

In [4]:
# ==================================================================================================
#  Voxel Dataset
# ==================================================================================================
class VoxelDataset(Dataset):
    def __init__(self, in_dir, gt_dir, dimensions = (64, 128, 64)):
        self.in_dir = in_dir
        self.gt_dir = gt_dir
        self.dimensions = dimensions
        self.aabb = o3d.geometry.AxisAlignedBoundingBox(min_bound=(-1, -1, -1), max_bound=(1, 1, 1))
        
        self.in_files = sorted([f for f in os.listdir(in_dir) if f.endswith(".ply")])
        self.gt_files = sorted([f for f in os.listdir(gt_dir) if f.endswith(".ply")])
        assert len(self.in_files) == len(self.gt_files), "Mismatch input and ground truth files"

    def __len__(self):
        return len(self.in_files)

    def __getitem__(self, idx):
        # Random augmentations an their probabilities
        rot_angle = random.random() * 360.0
        mirroring = random.random()
        crop_distance = random.random() * 0.4
        crop_prob = random.random()

        # Prepare the input data
        input_volume = np.zeros(self.dimensions, dtype=np.float32)
        in_path = os.path.join(self.in_dir, self.in_files[idx])
        pcd_in = load_pcd(in_path)
        _, pcd_cut = generate_rotated_view(pcd_in, rot_angle)

        # Apply random augmentations
        if mirroring < 0.5: # mirror
            mirror_transform = np.array([
                [-1,  0,  0,  0],
                [ 0,  1,  0,  0],
                [ 0,  0,  1,  0],
                [ 0,  0,  0,  1]
            ])
            pcd_cut.transform(mirror_transform)

        if crop_prob < 0.3: # crop
            cd = -0.2 - crop_distance
            y_transform = np.array([
                [ 1,  0,  0,   0],
                [ 0,  1,  0,  cd],
                [ 0,  0,  1,   0],
                [ 0,  0,  0,   1]
            ])
            pcd_cut.transform(y_transform)
            pcd_cut = pcd_cut.crop(self.aabb)

        # Normalize point cloud into bounding box and construct input tensor
        norm_pcd_cut = normalize(pcd_cut)
        input_points = np.asarray(norm_pcd_cut.points, dtype=np.uint8)
        for (x, y, z) in input_points:
            input_volume[x, y, z] = 1

        input_tensor = torch.tensor(input_volume, dtype=torch.float32).unsqueeze(0)
        

        # Prepare the ground truth data
        target_volume = np.zeros(self.dimensions, dtype=np.float32)
        gt_path = os.path.join(self.gt_dir, self.gt_files[idx])
        pcd_gt = load_pcd(gt_path)
        pcd_full, _ = generate_rotated_view(pcd_gt, rot_angle)

        # Apply random augmentations
        if mirroring < 0.5: # mirror
            mirror_transform = np.array([
                [-1,  0,  0,  0],
                [ 0,  1,  0,  0],
                [ 0,  0,  1,  0],
                [ 0,  0,  0,  1]
            ])
            pcd_full.transform(mirror_transform)

        if crop_prob < 0.3: # crop
            cd = -0.2 - crop_distance
            y_transform = np.array([
                [ 1,  0,  0,   0],
                [ 0,  1,  0,  cd],
                [ 0,  0,  1,   0],
                [ 0,  0,  0,   1]
            ])
            pcd_full.transform(y_transform)
            pcd_full = pcd_full.crop(self.aabb)

        # Normalize point cloud into bounding box and construct ground truth tensor
        norm_pcd_full = normalize(pcd_full)
        target_points = np.asarray(norm_pcd_full.points, dtype=np.uint8)
        for (x, y, z) in target_points:
            target_volume[x, y, z] = 1

        target_tensor = torch.tensor(target_volume, dtype=torch.float32).unsqueeze(0)

        # Return both prepared tensors
        return input_tensor, target_tensor

## Loss function

In [5]:
# Constrct helper arrays and tensors
ones_array = np.ones((64,128,64), dtype=np.float32)
ones = torch.tensor(ones_array, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
block_3x3_array = np.ones((3,3,3), dtype=np.float32) * 28
block_3x3t = torch.tensor(block_3x3_array, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
block_5x5_array = np.ones((5,5,5), dtype=np.float32) * 126
block_5x5t = torch.tensor(block_5x5_array, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)

# ==================================================================================================
#  Voxel Dataset
# ==================================================================================================
def custom_loss(P, T):
    # P = Prediction
    # T = Target

    # Use BCE loss
    bce_loss_fn = nn.BCELoss(reduction="none")
    bce_loss = bce_loss_fn(P, T)

    # Apply block filters on target tensor
    block_3x3 = F.conv3d(T, block_3x3t, padding = 1)
    block_5x5 = F.conv3d(T, block_5x5t, padding = 2)

    # Construct custom weight, increase weight at area around the targe points
    W = ones + (20 * T) + (20 * torch.clamp(block_3x3, 0, 1)) + (20 * torch.clamp(block_5x5, 0, 1))
    combined_loss = bce_loss * W
    return combined_loss.mean()

## Model

In [6]:
# ==================================================================================================
#  Voxel Auto Encoder
# ==================================================================================================
class VoxelAutoEncoder(nn.Module):
    def __init__(self):
        super(VoxelAutoEncoder, self).__init__()

        # Encoder path
        self.enc1 = self.double_conv(1, 16)
        self.enc2 = self.double_conv(16, 32)
        self.enc3 = self.double_conv(32, 64)
        self.enc4 = self.double_conv(64, 128)
        
        # Downsampling layers
        self.pool = nn.MaxPool3d(2)
        
        # Bottleneck
        self.bottleneck = self.double_conv(128, 256)
        
        # Decoder path
        self.upconv4 = nn.ConvTranspose3d(256, 128, kernel_size=2, stride=2)
        self.dec4 = self.double_conv(256, 128)
        self.upconv3 = nn.ConvTranspose3d(128, 64, kernel_size=2, stride=2)
        self.dec3 = self.double_conv(128, 64)
        self.upconv2 = nn.ConvTranspose3d(64, 32, kernel_size=2, stride=2)
        self.dec2 = self.double_conv(64, 32)
        self.upconv1 = nn.ConvTranspose3d(32, 16, kernel_size=2, stride=2)
        self.dec1 = self.double_conv(32, 16)
        
        # Final output layer
        self.final = nn.Conv3d(16, 1, kernel_size=1)  # Output channels = 1

        # Sigmoid
        self.sigmoid = nn.Sigmoid()

    def double_conv(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv3d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm3d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv3d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm3d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        # Encoder
        enc1 = self.enc1(x)
        enc2 = self.enc2(self.pool(enc1))
        enc3 = self.enc3(self.pool(enc2))
        enc4 = self.enc4(self.pool(enc3))
        
        # Bottleneck
        bottleneck = self.bottleneck(self.pool(enc4))
        
        # Decoder
        dec4 = self.upconv4(bottleneck)
        dec4 = self.dec4(torch.cat((dec4, enc4), dim=1))
        
        dec3 = self.upconv3(dec4)
        dec3 = self.dec3(torch.cat((dec3, enc3), dim=1))
        
        dec2 = self.upconv2(dec3)
        dec2 = self.dec2(torch.cat((dec2, enc2), dim=1))
        
        dec1 = self.upconv1(dec2)
        dec1 = self.dec1(torch.cat((dec1, enc1), dim=1))
        
        # Final output layer
        out = self.final(dec1)
        
        return self.sigmoid(out)

## Train Model

### Parameters

In [7]:
input_path = "D:/3d_dataset/norms_4000"
output_path = "D:/3d_dataset/norms_10000"

batch_size = 1
train_split = 0.8
torch.manual_seed(42)
num_epochs = 2

weights_pth = "weights.pth"
trains_npy = "train_losses.npy"
tests_npy = "test_losses.npy"

### Train

In [8]:
# ==================================================================================================
#  Train Model
# ==================================================================================================
def train_model():
    # Prepare Dataloder, split, optimizer and model
    dataset = VoxelDataset(input_path, output_path)
    total_size = len(dataset)
    train_size = int(total_size * train_split)
    test_size = total_size - train_size
    train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    model = VoxelAutoEncoder()
    #optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    model = model.to(device)
    best_model = copy.deepcopy(model)

    # Arrays to track train and test losses
    best_epoch_loss = float('inf')
    all_train_losses_epoch = []
    all_test_losses_epoch = []

    # Train the model
    for epoch in tqdm(range(1, num_epochs + 1), desc="Training the network"):

        # Train epoch
        model.train()
        train_losses_epoch = []
        with tqdm(train_loader, unit=" batch") as tepoch:
            for idx, (inputs, targets) in enumerate(tepoch):
                tepoch.set_description("Epoch {}".format(epoch))
                inputs, targets = inputs.to(device), targets.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = custom_loss(outputs, targets)
                loss.backward()
                optimizer.step()
                train_losses_epoch.append(loss.item())

            train_loss = np.mean(train_losses_epoch)
            all_train_losses_epoch.append(train_loss)
            print(f"Epoch {epoch+1}/{num_epochs}, Train-Loss: {train_loss:.6f}")
        
        # Test epoch
        model.eval()
        test_losses_epoch = []
        with torch.no_grad():
            with tqdm(test_loader, unit=" batch") as tepoch:
                for idx, (inputs, targets) in enumerate(tepoch):
                    inputs, targets = inputs.to(device), targets.to(device)
                    outputs = model(inputs)
                    loss = custom_loss(outputs, targets)
                    test_losses_epoch.append(loss.item())

        test_loss = np.mean(test_losses_epoch)
        all_test_losses_epoch.append(test_loss)

        print(f"Epoch {epoch+1}/{num_epochs}, Test-Loss: {test_loss:.6f}")

        # Save best model
        if (test_loss < best_epoch_loss):
            best_epoch_loss = test_loss
            best_model = copy.deepcopy(model)
            
            torch.save(best_model.state_dict(), weights_pth)
            print("Model saved to: ", weights_pth)

        # Print results and save losses as ".npy" file
        print("Training losses: ")
        print(all_train_losses_epoch)
        print("Test losses:")
        print(all_test_losses_epoch)
        np.save(trains_npy, all_train_losses_epoch)
        np.save(tests_npy, all_test_losses_epoch)
    
    print("Training complete!")
    torch.save(best_model.state_dict(), weights_pth)
    print("Model saved to: ", weights_pth)
    print("Training losses: ")
    print(all_train_losses_epoch)
    print("Test losses:")
    print(all_test_losses_epoch)
    np.save(trains_npy, all_train_losses_epoch)
    np.save(tests_npy, all_test_losses_epoch)

# Apply model Training
train_model()

Epoch 1:  19%|█▉        | 783/4176 [03:29<15:07,  3.74 batch/s]
Training the network:   0%|          | 0/2 [03:29<?, ?it/s]


KeyboardInterrupt: 