# Test Existing Network

In [1]:
import math
import sys
import time

import matplotlib.pyplot as plt
import numpy as np
import open3d as o3d
from scipy import spatial

import torch
from torch.utils.tensorboard import SummaryWriter
from torchsummary import summary
from tqdm.auto import tqdm

from data_splitter import DataSplitter
from training_set import TrainingSetLidarSeg
from loss import *
# from model_fcn import ModelFCN
# from model_simple_for_testing import ModelSimpleForTesting
from model_unet import ModelUnet
from sphere import Sphere
from visualize import Visualize
from metrics import *
from average_meter import AverageMeter
    
%matplotlib inline
%load_ext autoreload
%autoreload 2

In [2]:
print(f"Initializing CUDA...")
torch.cuda.set_device(0)
torch.backends.cudnn.benchmark = True

print(f"Setting parameters...")
bandwidth = 100
learning_rate = 4.5e-3
n_epochs = 1
batch_size = 5
num_workers = 32
n_classes = 9

print(f"Initializing data structures...")
# net = ModelSimpleForTesting(bandwidth=bandwidth, n_classes=n_classes).cuda()
net = ModelUnet(bandwidth=bandwidth, n_classes=n_classes).cuda()

optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)

# criterion = L2Loss(alpha=0.5, margin=0.2)
# criterion = CrossEntropyLoss(n_classes=n_classes)
criterion = NegativeLogLikelihoodLoss(n_classes=n_classes)

writer = SummaryWriter()
stored_model = 'test_training_params.pkl'
net.load_state_dict(torch.load(stored_model))

print(f"All instances initialized.")

Initializing CUDA...
Setting parameters...
Initializing data structures...
All instances initialized.


In [3]:
# export_ds = '/mnt/data/datasets/nuscenes/processed'
export_ds = '/media/scratch/berlukas/nuscenes'
img_filename = f"{export_ds}/images.npy"
cloud_filename = f"{export_ds}/clouds1.npy"
sem_clouds_filename = f"{export_ds}/sem_classes_gt1.npy"
dec_clouds = f"{export_ds}/decoded.npy"
dec_gt = f"{export_ds}/decoded_gt.npy"

print(f"Loading from images from {img_filename}, clouds from {cloud_filename} and sem clouds from {sem_clouds_filename}")
# img_features = np.load(img_filename)
cloud_features = np.load(cloud_filename)
sem_cloud_features = np.load(sem_clouds_filename)
print(f"Shape of clouds is {cloud_features.shape} and sem clouds is {sem_cloud_features.shape}")

Loading from images from /media/scratch/berlukas/nuscenes/images.npy, clouds from /media/scratch/berlukas/nuscenes/clouds1.npy and sem clouds from /media/scratch/berlukas/nuscenes/sem_classes_gt1.npy
Shape of clouds is (11230, 2, 200, 200) and sem clouds is (11230, 200, 200)


In [5]:
n_process = 400
# img_features = img_features[0:n_process, :, :, :]
cloud_features = cloud_features[0:n_process, :, :, :]
sem_cloud_features = sem_cloud_features[0:n_process, :, :]
print(f"Shape of clouds is {cloud_features.shape} and sem clouds is {sem_cloud_features.shape}")

Shape of clouds is (400, 2, 200, 200) and sem clouds is (400, 200, 200)


In [6]:
# Initialize the data loaders
train_set = TrainingSetLidarSeg(bandwidth, cloud_features, sem_cloud_features)
print(f"Total size of the training set: {len(train_set)}")
split = DataSplitter(train_set, False, test_train_split=0.9, shuffle=True)

# Split the data into train, val and optionally test
train_loader, val_loader, test_loader = split.get_split(
    batch_size=batch_size, num_workers=num_workers)
train_size = split.get_train_size()
val_size = split.get_val_size()
test_size = split.get_test_size()


print("Training size: ", train_size)
print("Validation size: ", val_size)
if test_size == 0:
    print('Test size is 0. Configured for external tests')
else:
    print("Testing size: ", test_size)

Total size of the training set: 400
Training size:  324
Validation size:  36
Testing size:  40


In [7]:
def test_lidarseg(net, criterion, writer):
    all_decoded_clouds = [None] * test_size
    all_gt_clouds = [None] * test_size
    k = 0
    avg_pixel_acc = AverageMeter()
    avg_pixel_acc_per_class = AverageMeter()
    avg_jacc = AverageMeter()
    avg_dice = AverageMeter()
    n_iter = 0
    net.eval()
    with torch.no_grad():            
        for batch_idx, (cloud, lidarseg_gt) in enumerate(test_loader):
            cloud, lidarseg_gt = cloud.cuda().float(), lidarseg_gt.cuda().long()
            enc_dec_cloud = net(cloud)
            
            pred_segmentation = torch.argmax(enc_dec_cloud, dim=1)
            pixel_acc, pixel_acc_per_class, jacc, dice = eval_metrics(lidarseg_gt, pred_segmentation, num_classes = n_classes)
            avg_pixel_acc.update(pixel_acc)
            avg_pixel_acc_per_class.update(pixel_acc_per_class)
            avg_jacc.update(jacc)
            avg_dice.update(dice)
            
            writer.add_scalar('Test/PixelAccuracy', pixel_acc, n_iter)   
            writer.add_scalar('Test/PixelAccuracyPerClass', pixel_acc_per_class, n_iter)   
            writer.add_scalar('Test/JaccardIndex', jacc, n_iter)
            writer.add_scalar('Test/DiceCoefficient', dice, n_iter)  
            
            n_batch = enc_dec_cloud.shape[0]
            for i in range(0, n_batch):                
                all_decoded_clouds[k] = enc_dec_cloud.cpu().data.numpy()[i,:,:,:]
                all_gt_clouds[k] = lidarseg_gt.cpu().data.numpy()[i,:,:]
                k = k + 1     
            n_iter += 1
            
        writer.add_scalar('Test/AvgPixelAccuracy', avg_pixel_acc.avg, n_iter)   
        writer.add_scalar('Test/AvgPixelAccuracyPerClass', avg_pixel_acc_per_class.avg, n_iter)   
        writer.add_scalar('Test/AvgJaccardIndex', avg_jacc.avg, n_iter)
        writer.add_scalar('Test/AvgDiceCoefficient', avg_dice.avg, n_iter)  

    return all_decoded_clouds, all_gt_clouds

In [8]:
print("Starting testing...")

torch.cuda.empty_cache()
decoded_clouds, gt_clouds = test_lidarseg(net, criterion, writer)

np.save(dec_gt, gt_clouds)
np.save(dec_clouds, decoded_clouds)

writer.close()
print("Testing finished!")

Starting testing...
Testing finished!
