In [None]:
#| eval: false
import torch
import numpy as np
import os, time
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.nn import CrossEntropyLoss
from tqdm import tqdm
import torch.nn as nn
from torch.optim import Adam
from MVLidarImplementation import model
from pathlib import Path
import matplotlib.pyplot as plt

By default, Jupyter Lab initializes within the nbs folder, so it's a good practice to return to the root directory and save the path to avoid issues with paths.

In [None]:
#| eval: false
os.chdir("..")
ROOT_PATH = os.getcwd()
ROOT_PATH

To work with the AIR libraries, it's necessary to clone the GitHub repositories, navigate to the directory, perform the installation and import, and then return to the root directory.

In [None]:
#| eval: false
libraries_path = ROOT_PATH+"/libraries"

if not os.path.exists(libraries_path):
    os.makedirs(libraries_path)

os.chdir(libraries_path)

#--depth 1 flag load only the last commit since the repositories are still under development.
!git clone --depth 1 https://github.com/AIR-UFG/Cloud2DImageConverter.git
!git clone --depth 1 https://github.com/AIR-UFG/SemanticKITTI_Tools.git

os.chdir(libraries_path+"/Cloud2DImageConverter")
!pip install -e '.[dev]'
from Cloud2DImageConverter import api

os.chdir(libraries_path+"/SemanticKITTI_Tools")
!pip install -e '.[dev]'
from SemanticKITTI_Tools import data

os.chdir(ROOT_PATH)

Set config variables for the training

In [None]:
#| eval: false
INIT_LR = 0.0001
NUM_EPOCHS = 40
BATCH_SIZE = 4
N_CLASSES = 7
MODEL_PATH = "mvlidar.pth"
PLOT_PATH = "plot.png"
TEST_PATHS = "test_paths.txt"

Determine the device to be used for training and evaluation

In [None]:
#| eval: false
DEVICE = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {DEVICE} device")
# determine if we will be pinning memory during data loading
PIN_MEMORY = True if DEVICE == "cuda" else False

Get the projected dataset and merge the depth and reflectance images in a 2-channel image 

In [None]:
#| eval: false
# insert the path to your dataset
train_path = ROOT_PATH+"/train/"
test_path = ROOT_PATH+"/test/"

train_merged_path = ROOT_PATH+"/train-merged/"
test_merged_path = ROOT_PATH+"/test-merged/"

data_paths = [(train_path, train_merged_path), (test_path, test_merged_path)]

for data_path, merged_path in data_paths:
    os.makedirs(merged_path, exist_ok=True)
    api.merge_images(data_path, merged_path)

The MVLidar article only uses 7 classes for detection, so a remapping of the classes IDs is needed, as in:

0 - unknown/outlier: every other class

1 - car: 1

2 - truck: 4

3 - person/pedestrians: 6

4 - cyclist: 7, 2

5 - road: 9,

6 - sidewalk: 11

In [None]:
#| eval: false
train_segmentation_mask = ROOT_PATH+"/train_segmentation_mask/"
test_segmentation_mask = ROOT_PATH+"/test_segmentation_mask/"

masks_paths = [(train_path, train_segmentation_mask), (test_path, test_segmentation_mask)]

remapping_rules = {
  1: 1,
  4: 2,
  6: 3,
  7: 4,
  2: 4,
  9: 5,
  11: 6
}

for data_path, mask_path in masks_paths:
    os.makedirs(mask_path, exist_ok=True)
    data.remap_segmentation_masks(data_path, mask_path, remapping_rules=remapping_rules)

Creates the train and test datasets

In [None]:
#| eval: false
transform = transforms.Compose([
    transforms.ToTensor()
])

train_dataset = data.SemanticDataset(image_path=train_merged_path,
                        mask_path=train_segmentation_mask,
                        transform=transform)

test_dataset = data.SemanticDataset(image_path=test_merged_path,
                        mask_path=test_segmentation_mask,
                        transform=transform)

Creates the train and test loaders

In [None]:
#| eval: false
trainLoader = DataLoader(train_dataset, shuffle=True,
	batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
	num_workers=os.cpu_count())

testLoader = DataLoader(test_dataset, shuffle=False,
	batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
	num_workers=os.cpu_count())

The model is imported from the 01_model notebook, CrossEntropy is used as loss and Adam as the optimizer

In [None]:
#| eval: false
mvlidar = model.MVLidar(N_CLASSES).to(DEVICE)

# when the reduction parameter is set to none, it means that no aggregation is applied, and a separate loss value for each input sample is returned
lossFunc = CrossEntropyLoss(reduction='none')
opt = Adam(mvlidar.parameters(), lr=INIT_LR)

trainSteps = len(train_dataset) // BATCH_SIZE
testSteps = len(test_dataset) // BATCH_SIZE

H = {"train_loss": [], "test_loss": []}

A binary mask is applied to the loss in order to ensure that the model only focuses on the labeled regions, and not on the black parts of the images

In [None]:
#| eval: false
def apply_loss_binary_mask(pred, y):
  bin_mask_train = (y !=0).int()
  loss = lossFunc(pred, y)
  loss = loss * bin_mask_train
  loss = loss.mean()
  return loss

## Training
Run the training loop

In [None]:
#| eval: false
print("[INFO] training the network...")
startTime = time.time()

for e in tqdm(range(NUM_EPOCHS)):

	mvlidar.train()

	totalTrainLoss = 0
	totalTestLoss = 0

	for (i, (x, y)) in enumerate(trainLoader):

		(x, y) = (x.to(DEVICE), y.to(DEVICE))

		pred = mvlidar(x)
		loss = apply_loss_binary_mask(pred, y)

		opt.zero_grad()
		loss.backward()
		opt.step()

		totalTrainLoss += loss

	with torch.no_grad():
		mvlidar.eval()

		for (x, y) in testLoader:
			(x, y) = (x.to(DEVICE), y.to(DEVICE))

			pred = mvlidar(x)
			loss = apply_loss_binary_mask(pred, y)
			totalTestLoss += loss

	avgTrainLoss = totalTrainLoss / trainSteps
	avgTestLoss = totalTestLoss / testSteps

	H["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
	H["test_loss"].append(avgTestLoss.cpu().detach().numpy())
	print("[INFO] EPOCH: {}/{}".format(e + 1, NUM_EPOCHS))
	print("Train loss: {:.6f}, Test loss: {:.4f}".format(
		avgTrainLoss, avgTestLoss))

endTime = time.time()
print("[INFO] total time taken to train the model: {:.2f}s".format(
	endTime - startTime))

Plot the training loss, save the image and the model

In [None]:
#| eval: false
plt.style.use("ggplot")
plt.figure()
plt.plot(H["train_loss"], label="train_loss")
plt.plot(H["test_loss"], label="test_loss")
plt.title("Training Loss on Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend(loc="lower left")
plt.savefig(PLOT_PATH)

# serialize the model to disk
torch.save(mvlidar, MODEL_PATH)

## Predictions
Load the saved model and make the predictions using argmax

In [None]:
#| eval: false
def make_predictions(model, imagePath, masksPath, transform):

    model.eval()

    with torch.no_grad():

        image = PILImage.open(imagePath)
        image = np.array(image).astype(np.float32)

        # Ground truth path
        filename = imagePath.split(os.path.sep)[-1]
        groundTruthPath = os.path.join(MASK_TEST_DATASET_PATH, filename)


        gtMask = PILImage.open(groundTruthPath)
        gtMask = np.array(gtMask)

        '''
        Make channel axis to be the leading one;
        Add batch dimension;
        Create pytorch tensor;
        Flash it to current device
        '''
        image = np.transpose(image, (2, 0, 1))
        image = np.expand_dims(image, 0)
        image = torch.from_numpy(image).to(DEVICE)

        # Prediction
        predMask = model(image).squeeze()

        argmax = torch.argmax(predMask, dim=0)

        # Cloud2DImageConverter api to convert index to the corresponding color
        prediction = api.color_matrix(np.array(argmax.cpu()))
        prediction = PILImage.fromarray(prediction)

        gtMask = api.color_matrix(gtMask)
        gtMask = PILImage.fromarray(gtMask)

        # Visualization
        display(gtMask)
        display(prediction)

Retrieve the test images for prediction

In [None]:
#| eval: false
imagePaths = os.listdir(test_merged_path)
imagePaths = imagePaths[:10]
path = os.path.join(test_merged_path, imagePaths[0])
image = Image.open(path)
image_array = np.array(image)

mvlidar = torch.load(MODEL_PATH).to(DEVICE)

for i in imagePaths:
    path = os.path.join(test_merged_path, i)
    make_predictions(mvlidar, path, test_segmentation_mask, transform)