# ModelNet classification with ORION

In [None]:
import torch
import wandb
import numpy as np
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

## Preprocessing
#### Definition of the Voxel Grid Dataset

In [None]:
from torch.utils.data import Dataset, DataLoader
import h5py
class VoxelDataset(Dataset):
    def __init__(self, file_txt_path, transform=None):
        
        with open(file_txt_path) as file:
            lines = file.readlines()
        
        self.paths = [line.strip('\n') for line in lines]
        self.data, self.label, self.label_pose = self._combine_files() 
        self.transform = transform
    
    def _combine_files(self):
        combined_data = None
        combined_label = None
        combined_label_pose = None

        for path in self.paths:
            file = h5py.File(path, 'r')
            data = file['data']

            label = np.int64(file['label']) 
            label_pose = np.int64(file['label_pose'])

            if combined_data is None:
                combined_data = data
                combined_label = label
                combined_label_pose = label_pose
            else:
                combined_data = np.concatenate((combined_data, data), axis=0)
                combined_label = np.concatenate((combined_label, label), axis=0)
                combined_label_pose = np.concatenate((combined_label_pose, label_pose), axis=0)

        return combined_data, combined_label, combined_label_pose

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data = self.data[idx]
        label = self.label[idx]
        label_pose = self.label_pose[idx]
        if self.transform:
            data = self.transform(data)
        return data, label, label_pose

#### Transformations

In [None]:
from torchvision.transforms import Compose

class CropTransform(torch.nn.Module):
    def forward(self, data):
        cropped_data = data[: ,2:34, 2:34, 2:34]
        return cropped_data
        
class ToTensor(torch.nn.Module):
    def forward(self, data):
        tensor = torch.from_numpy(data.astype(np.float32))
        return tensor

transforms = Compose([
    CropTransform(),
    ToTensor()
])

#### Dataset & Dataloaders

In [None]:
# selecting dataset either 10 or 40 and the number of rotations
dataset= "40"
rotations = "24"

train_dataset = VoxelDataset(f'../content/datasets/ModelNet{dataset}_bin_from_mat//poseplan_MN{dataset}_{rotations}/hdf5/train/train.txt', transforms)
validation_dataset = VoxelDataset(f'../content/datasets/ModelNet{dataset}_bin_from_mat//poseplan_MN{dataset}_{rotations}/hdf5/validation/validation.txt', transforms)
test_dataset = VoxelDataset(f'../content/datasets/ModelNet{dataset}_bin_from_mat//poseplan_MN{dataset}_{rotations}/hdf5/test/test.txt', transforms)

# dataloaders
train_dataloader = DataLoader(train_dataset, 32, shuffle=True, num_workers=0)
validation_dataloader = DataLoader(validation_dataset, int(rotations), shuffle=False, num_workers=0)
test_dataloader = DataLoader(test_dataset, int(rotations), shuffle=False, num_workers=0)

## Definition of the neural networks

#### ORION_4_V1
uses 4 convolutional layers  
uses ReLU as activation function for the convolutional layers

In [None]:
from torch.nn import Module, Sequential, Conv3d, BatchNorm3d, ReLU, Dropout3d, MaxPool3d, Linear, LeakyReLU, Dropout

class ORION4_V1(Module):
    
    def __init__(self, num_classes, num_pose_label):
        super().__init__()
        self.network = Sequential(
            
            # Definition of Conv1
            Conv3d(in_channels=1, out_channels=32, kernel_size=3, stride=2),
            BatchNorm3d(num_features=32),
            ReLU(),
            Dropout3d(p=0.2),

            # Definition of Conv2
            Conv3d(in_channels=32, out_channels=64, kernel_size=3, stride=1),
            BatchNorm3d(num_features=64),
            ReLU(),
            Dropout3d(p=0.3),

            # Definitionn of Conv3
            Conv3d(in_channels=64, out_channels=128, kernel_size=3, stride=1),
            BatchNorm3d(num_features=128),
            ReLU(),
            Dropout3d(p=0.4),

            # Definition of Conv4
            Conv3d(in_channels=128, out_channels=256, kernel_size=3, stride=1),
            BatchNorm3d(num_features=256),
            ReLU(),
            MaxPool3d(kernel_size=2, stride=2),
            Dropout3d(p=0.6),

        )
        self.fc1 = Sequential(
            Linear(in_features=256*4*4*4, out_features=128),
            ReLU(),
            Dropout(p=0.4)
        )
        self.class_layer = Linear(128, num_classes)
        self.pose_layer = Linear(128, num_pose_label)    
        self.apply(self._init_weights)

    def forward(self, x):
        x = self.network(x).reshape((x.shape[0], -1)) # added flatten layer
        x = self.fc1(x)
        class_output = self.class_layer(x)
        pose_output = self.pose_layer(x)
        return class_output, pose_output

    def _init_weights(self, module):
        if isinstance(module, torch.nn.Linear):
            torch.nn.init.normal_(module.weight, std=0.01)
            if module.bias is not None:
                module.bias.data.zero_()
        if isinstance(module, torch.nn.Conv3d):
            torch.nn.init.kaiming_normal_(module.weight, nonlinearity='relu')
            if module.bias is not None:
                module.bias.data.zero_()

#### ORION_4_V2 
uses 4 convolutional  
uses Leaky ReLU as activation function for the convolutional layers

In [None]:
class ORION4_V2(Module):
    
    def __init__(self, num_classes, num_pose_label):
        super().__init__()
        self.network = Sequential(
            
            # Definition of Conv1
            Conv3d(in_channels=1, out_channels=32, kernel_size=3, stride=2),
            BatchNorm3d(num_features=32),            
            LeakyReLU(negative_slope=0.1),
            Dropout3d(p=0.2),

            # Definition of Conv2
            Conv3d(in_channels=32, out_channels=64, kernel_size=3, stride=1),
            BatchNorm3d(num_features=64),
            LeakyReLU(negative_slope=0.1),
            Dropout3d(p=0.3),

            # Definitionn of Conv3
            Conv3d(in_channels=64, out_channels=128, kernel_size=3, stride=1),
            BatchNorm3d(num_features=128),
            LeakyReLU(negative_slope=0.1),
            Dropout3d(p=0.4),

            # Definition of Conv4
            Conv3d(in_channels=128, out_channels=256, kernel_size=3, stride=1),
            BatchNorm3d(num_features=256),
            LeakyReLU(negative_slope=0.1),
            MaxPool3d(kernel_size=2, stride=2),
            Dropout3d(p=0.6),

        )
        self.fc1 = Sequential(
            Linear(in_features=256*4*4*4, out_features=128),
            ReLU(),
            Dropout(p=0.4)
        )
        self.class_layer = Linear(128, num_classes)
        self.pose_layer = Linear(128, num_pose_label)    
        self.apply(self._init_weights)

    def forward(self, x):
        x = self.network(x).reshape((x.shape[0], -1)) # added flatten layer
        x = self.fc1(x)
        class_output = self.class_layer(x)
        pose_output = self.pose_layer(x)
        return class_output, pose_output

    def _init_weights(self, module):
        if isinstance(module, torch.nn.Linear):
            torch.nn.init.normal_(module.weight, std=0.01)
            if module.bias is not None:
                module.bias.data.zero_()
        if isinstance(module, torch.nn.Conv3d):
            torch.nn.init.kaiming_normal_(module.weight, nonlinearity='leaky_relu')
            if module.bias is not None:
                module.bias.data.zero_()

## Training and Testing Functions
#### Function used to calculate the accuracy on validation and test sets

In [None]:
from tqdm import tqdm
def calculate_accuracies (network, dataloader):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    network.to(device)

    softmax= torch.nn.Softmax(dim=1)

    network.eval()
    # calculate test accuracy
    iterator = tqdm(dataloader, disable=True)    
    with torch.no_grad():
        class_pred_arr = []
        class_true_arr = []
        pose_pred_arr = []
        pose_true_arr = []

        for batch_data, batch_class_labels, batch_pose_labels in iterator:

            # moving batch to device and formatting it
            batch_data = batch_data.to(device)
            batch_class_labels = batch_class_labels.squeeze().to(device)
            batch_pose_labels = batch_pose_labels.squeeze().to(device)
                
            # forward pass
            class_pred, pose_pred = network(batch_data)
            
            # converting pose output to pose_label
            pose_pred = softmax(pose_pred)        
            pose_pred = torch.argmax(pose_pred, dim=1)

            # converting class output to single prediction
            sum_class_pred = torch.sum(class_pred, dim=0)
            class_pred = torch.argmax(sum_class_pred)

            # taking the firts element since by construction each batch contains the 12 
            # orientation of the same object so the first class_label is the true class         
            batch_class_labels = batch_class_labels[0]

            # adding labels and prediction to array
            class_pred_arr.append(class_pred)
            class_true_arr.append(batch_class_labels)
            pose_pred_arr.append(pose_pred)
            pose_true_arr.append(batch_pose_labels)

        # concatenating to obtain a single tensor
        class_pred_arr = torch.stack(class_pred_arr)    
        class_true_arr = torch.stack(class_true_arr)
        pose_pred_arr = torch.cat(pose_pred_arr, axis=0)
        pose_true_arr = torch.cat(pose_true_arr, axis=0) 

        # calculating accuracies
        class_accuracy = torch.sum(class_pred_arr == class_true_arr)/len(class_true_arr)
        pose_accuracy = torch.sum(pose_pred_arr == pose_true_arr)/len(pose_true_arr)
        
        return class_accuracy, pose_accuracy

#### Function used to calculate the loss on validation and test sets

In [None]:
from torch.nn import CrossEntropyLoss

def calculate_loss (network, dataloader):

    device = "cuda" if torch.cuda.is_available() else "cpu"
    network.to(device)

    # defining loss functions
    loss_fn_class = CrossEntropyLoss()
    loss_fn_pose = CrossEntropyLoss()

    #putting network inn evaluation mode
    network.eval()
    iterator = tqdm(dataloader, disable=True)
    
    # stopping calculating gradients and forward pass
    with torch.no_grad():
        class_pred = []
        class_true = []
        pose_pred = []
        pose_true = []

        # iterating through the batches
        for batch_data, batch_class_labels, batch_pose_labels in iterator:
            
            # moving batch to device and formatting it
            batch_data = batch_data.to(device)
            batch_class_labels = batch_class_labels.squeeze().to(device)
            batch_pose_labels = batch_pose_labels.squeeze().to(device)
            
            # forward pass
            out_class_pred, out_pose_pred = network(batch_data)

            # adding labels and prediction to array
            class_pred.append(out_class_pred)
            class_true.append(batch_class_labels)
            pose_pred.append(out_pose_pred)
            pose_true.append(batch_pose_labels)

        # concatenating to obtain a single tensor
        class_pred = torch.cat(class_pred, axis=0)
        class_true = torch.cat(class_true, axis=0)
        pose_pred = torch.cat(pose_pred, axis=0)
        pose_true = torch.cat(pose_true, axis=0)

        #calculating Losses
        class_loss = loss_fn_class(class_pred, class_true)
        pose_loss = loss_fn_pose(pose_pred, pose_true)
        loss = (class_loss + pose_loss)/2
        
    return loss, class_loss, pose_loss

#### Training Phase
This function will do a single epoch of the training phase

In [None]:
def training_epoch(dataloader, network, loss_fn_class, loss_fn_pose, optimizer):

    device = "cuda" if torch.cuda.is_available() else "cpu"

    # putting neetwork in training mode
    network.train()
    iterator = tqdm(dataloader)
    
    for batch_data, batch_class_labels, batch_pose_labels in iterator:

        # moving batch to device and formatting it
        batch_data = batch_data.to(device)
        batch_class_labels = batch_class_labels.squeeze().to(device)
        batch_pose_labels = batch_pose_labels.squeeze().to(device)
        
        # forward pass
        class_pred, pose_pred = network(batch_data)

        #calculating Losses
        class_loss = loss_fn_class(class_pred, batch_class_labels)
        pose_loss = loss_fn_pose(pose_pred, batch_pose_labels)
        train_loss = (class_loss + pose_loss)/2

        # logging training losses
        wandb.log({"train_loss": train_loss, "pose_train_loss": pose_loss, "class_train_loss": class_loss })
        
        # bacward pass
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()

        # viewing batch results
        iterator.set_description(f"Train loss: {train_loss.detach().cpu().numpy()} class loss: {class_loss.detach().cpu().numpy()}  pose loss: {pose_loss.detach().cpu().numpy()}")

#### Training Loop
This function will execute the entire training loop

In [None]:

def training_loop(network, optimizer, train_dl, val_dl, test_dl, epochs, early_stopping):

    # defining loss functions
    loss_fn_class = CrossEntropyLoss()
    loss_fn_pose = CrossEntropyLoss()

    device = "cuda" if torch.cuda.is_available() else "cpu"
    network.to(device)

    best_val_loss = np.inf
    es_count = 0 #early stopping counter

    # iterating through the epochs
    for epoch in range(epochs):
        print('\nEpoch: '+str(epoch))

        # TRAINING
        print("TRAINING PHASE")

        #doing a single training epoch
        training_epoch(train_dl, network, loss_fn_class, loss_fn_pose, optimizer)
        
        # VALIDATION
        print("VALIDATION PHASE")

        # calculating losses
        val_loss, val_class_loss, val_pose_loss = calculate_loss(network, val_dl)
    
        # calculating accuracies
        val_class_acc, val_pose_acc  = calculate_accuracies(network, val_dl)

        # logging validation results
        wandb.log({"epoch": epoch,
                "validation_loss": val_loss,
                "pose_validation_loss": val_pose_loss,
                "class_validation_loss": val_class_loss,
                "class_validation_accuracy": val_class_acc,
                "pose_validation_accuracy": val_pose_acc,
                "class_validation_error": 1-val_class_acc,
                "pose_validation_error": 1-val_pose_acc})


        # viewing validation results
        print(f"Validation loss: {val_loss.detach().cpu().numpy()} \n class loss: {val_class_loss.detach().cpu().numpy()} \n pose loss: {val_pose_loss.detach().cpu().numpy()} \n class accuracy: {val_class_acc.detach().cpu().numpy()} \n pose accuracy: {val_pose_acc.detach().cpu().numpy()}")       
        
        # updating and saving the best model
        if val_loss < best_val_loss:
            print("Saved Model")
            torch.save(network.state_dict(), "model.pt")
            best_val_loss = val_loss
            es_count = 0
        else:
            # increment counter of epoch without improvement
            es_count += 1
        
        #early stopping clause
        if es_count >= early_stopping : 
            break

#### initializing Data Logging

In [None]:

# start a new wandb run to track this script
wandb.init(
    # set the wandb project where this run will be logged
    project="Modelnet40_multiple",
    
    # track hyperparameters and run metadata
    config={
    "dataset": "ModelNet40",
    "epochs": 200,
    }
)
wandb.define_metric("epoch")

wandb.define_metric("validation_loss", step_metric="epoch")
wandb.define_metric("class_validation_loss", step_metric="epoch")
wandb.define_metric("pose_validation_loss", step_metric="epoch")
wandb.define_metric("class_validation_accuracy", step_metric="epoch")
wandb.define_metric("pose_validation_accuracy", step_metric="epoch")
wandb.define_metric("class_validation_error", step_metric="epoch")
wandb.define_metric("pose_validation_error", step_metric="epoch")

In [None]:
from torch.optim import SGD, Adam

# selector for the desired experiment
run_selector = 3

# used to make comparable results between runs
# removed searching for best result
#torch.manual_seed(0)

# Obtaining initialization parameters
# since test is ordered the last pose label will give us the number of poses -1
_,_, last_pose_label = test_dataset.__getitem__(test_dataset.__len__()-1)
num_classes = int(dataset)
num_poses = last_pose_label[0] +1 # +1 to include also zero label
# Selecting stopping criterion
max_epochs = 200
early_stopping = 10

if run_selector==0:
    model = ORION4_V1(num_classes, num_poses)
    opt = SGD(model.parameters(), lr=1e-3, momentum=0.90, nesterov=True)
    training_loop(model, opt, train_dataloader, validation_dataloader, test_dataloader, max_epochs, early_stopping)

elif run_selector==1:
    model = ORION4_V1(num_classes, num_poses)
    opt = Adam(model.parameters(), lr=1e-5)
    training_loop(model, opt, train_dataloader, validation_dataloader, test_dataloader, max_epochs, early_stopping)

elif run_selector==2:
    model = ORION4_V2(num_classes, num_poses)
    opt = SGD(model.parameters(), lr=1e-3, momentum=0.90, nesterov=True)
    training_loop(model, opt, train_dataloader, validation_dataloader, test_dataloader, max_epochs, early_stopping)

elif run_selector==3:
    model = ORION4_V2(num_classes, num_poses)
    opt = Adam(model.parameters(), lr=1e-5)
    training_loop(model, opt, train_dataloader, validation_dataloader, test_dataloader, max_epochs, early_stopping)

else:
    print("Invalid Selection")

# closing logging at the end of the run
wandb.finish()

## Testing Phase

In [None]:
model.load_state_dict(torch.load("model.pt"))

val_class_accuracy, val_pose_accuracy = calculate_accuracies(model, validation_dataloader)

print("Validation Class Accuracy : "+str(val_class_accuracy))
print("Validation Pose Accuracy : "+str(val_pose_accuracy))

test_class_accuracy, test_pose_accuracy = calculate_accuracies(model, test_dataloader)

print("Test Class Accuracy : "+str(test_class_accuracy))
print("Test Pose Accuracy : "+str(test_pose_accuracy))