# RIU-Net for SemanticKITTI

## Setup
- In the case of not using google drive, just swap the paths with the path in you machine.

In [1]:
import numpy as np
import time
import torch, os
import cv2
import matplotlib.pyplot as plt
from PIL import Image as PILImage
from IPython.display import Image, display
from google.colab.patches import cv2_imshow
from torch.utils.data import Dataset
from torch.nn import ReLU
from torch.nn import Sequential
from torch.nn import BatchNorm2d
from torch.nn import ConvTranspose2d
from torch.nn import Conv2d
from torch.nn import MaxPool2d
from torch.nn import Module
from torch.nn import ModuleList
from torchvision.transforms import CenterCrop
from torch.nn import functional as F
from tqdm import tqdm
from imutils import paths
from torch.optim import Adam
import torch.nn.functional as F
from torchvision import transforms as tfms
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

In [None]:
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

If you're using nbdev, by default Jupyter Lab initializes within the nbs folder, so it's a good practice to return to the root directory and save the path to avoid issues with paths.

In [2]:
#os.chdir("..") comment this line if you're not using nbdev
ROOT_PATH = os.getcwd()
ROOT_PATH

'/home/lucas/AIR/RIU-Net_PCSS'

To work with the AIR libraries, it's necessary to clone the GitHub repositories, navigate to the directory, perform the installation and import, and then return to the root directory.

In [None]:
libraries_path = ROOT_PATH+"/libraries"

if not os.path.exists(libraries_path):
    os.makedirs(libraries_path)

os.chdir(libraries_path)

#--depth 1 flag load only the last commit since the repositories are still under development.
!git clone --depth 1 https://github.com/AIR-UFG/Cloud2DImageConverter.git
!git clone --depth 1 https://github.com/AIR-UFG/SemanticKITTI_Tools.git

os.chdir(libraries_path+"/Cloud2DImageConverter")
!pip install -e '.[dev]'
from Cloud2DImageConverter import api

os.chdir(libraries_path+"/SemanticKITTI_Tools")
!pip install -e '.[dev]'
from SemanticKITTI_Tools import data

os.chdir(ROOT_PATH)

In [None]:
api.merge_images(origin_path=ROOT_PATH+"/img_train/", destiny_path=ROOT_PATH+"/img_train/merged_imgs")
api.merge_images(origin_path=ROOT_PATH+"/img_test/", destiny_path=ROOT_PATH+"/img_test/merged_imgs")

In [4]:
imgs_train_path = ROOT_PATH+"/img_train/merged_imgs/"
masks_train_path = ROOT_PATH+"/img_train/segmentation_mask/"

imgs_test_path = ROOT_PATH+"/img_test/merged_imgs/"
masks_test_path = ROOT_PATH+"/img_test/segmentation_mask/"

## Dataset validation and tests

In [None]:
img_number = "/000001.png"
img_path = imgs_train_path + img_number
mask_path = masks_train_path + img_number

print(img_path)
print(mask_path)
display(Image(filename=img_path))

In [None]:
display(Image(filename=mask_path))

In [None]:
img = PILImage.open(img_path)
width, height = img.size
width, height

In [None]:
img2 = PILImage.open(mask_path)
np.unique(np.array(img2))


In [None]:
# Checks if image.png == label.png
files1 = sorted(os.listdir(imgs_train_path), reverse=True)
files2 = sorted(os.listdir(masks_train_path), reverse=True)
count = 0
for file1, file2 in zip(files1, files2):
  if file1 != file2:
    print(f'Image {file1} does not have a corresponding mask, that being {file2}')
  else:
      count = count + 1
if cont == len(files1) and len(files2):
  print(f'Everything good to go, with {count} images')

## Configs
- Tensor shape output at `SegmentationDataset` must be (B, C, H, W):
  - B: BATCH_SIZE
  - C: Image channels (2 for image_tensor and 1 or none for mask_tensor)
  - H: Image height (64 for both)
  - W: Image width (1024 for both)
- Make sure `mask_tensor` values are not normalized in order to apply the binary mask later on

In [5]:
VAL_SPLIT = 0.25

# Device used for traing and evaluation
DEVICE = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

PIN_MEMORY = True if DEVICE == "cuda" else False

print(f"Using {DEVICE} device")

Using cuda device


In [26]:
# HYPERPARAMETERS
INIT_LR = 0.001
NUM_EPOCHS = 10
BATCH_SIZE = 8
INPUT_IMAGE_WIDTH = 1024
INPUT_IMAGE_HEIGHT = 64
THRESHOLD = 0.5
MODEL_PATH = "unet_tgs_salt.pth"
PLOT_PATH = "plot.png"
TEST_PATHS = "test_paths.txt"

## Model


In [7]:
class Block(Module):
	def __init__(self, inChannels, outChannels):
		super().__init__()


		self.conv1 = Sequential(Conv2d(inChannels, outChannels, 3, 1, 1), BatchNorm2d(outChannels, momentum = 0.99), ReLU())
		self.conv2 = Sequential(Conv2d(outChannels, outChannels, 3, 1, 1), BatchNorm2d(outChannels, 1e-05, 0.99), ReLU())
	def forward(self, x):

		outputs1 = self.conv1(x)
		outputs2 = self.conv2(outputs1)

		return outputs2

In [8]:
class Encoder(Module):
	def __init__(self, channels=(2, 64, 128, 256, 512, 1024)):
		super().__init__()


		self.encBlocks = ModuleList([Block(channels[i], channels[i + 1])
																for i in range(len(channels) - 1)])
		self.pool = MaxPool2d(2)

	def forward(self, x):

		blockOutputs = []
		'''
		Pass the inputs into the current encoder block;
		Store the block outputs and aply the max-pooling;
		'''
		for block in self.encBlocks:
			x = block(x)
			blockOutputs.append(x)
			x = self.pool(x)

		return blockOutputs

In [9]:
class Decoder(Module):
	def __init__(self, channels=(1024, 512, 256, 128, 64)):
		super().__init__()
		'''
		Upsample and decoder blocks
		'''

		self.channels = channels
		self.upconvs = ModuleList([ConvTranspose2d(channels[i], channels[i + 1], 2, 2)
			 												for i in range(len(channels) - 1)])

		self.dec_blocks = ModuleList([Block(channels[i], channels[i + 1])
			 														for i in range(len(channels) - 1)])

	def forward(self, x, encFeatures):


		for i in range(len(self.channels) - 1):

			x = self.upconvs[i](x)

			'''
			Crop the current encoder block features;
			Concatenate with the current upsampled features;
			Pass the output to the current decoder block
			'''

			encFeat = self.crop(encFeatures[i], x)
			x = torch.cat([x, encFeat], dim=1)
			x = self.dec_blocks[i](x)

		return x

	def crop(self, encFeatures, x):
		'''
		Get input dimensions;
		Crops encoder features in order to match input dimensions;
		'''

		(_, _, H, W) = x.shape
		encFeatures = CenterCrop([H, W])(encFeatures)

		return encFeatures

In [10]:
class UNet(Module):
  def __init__(self, encChannels=(2, 64, 128, 256, 512, 1024), decChannels=(1024, 512, 256, 128, 64), nbClasses=20):
    super().__init__()

    self.encoder = Encoder(encChannels)
    self.decoder = Decoder(decChannels)

    # Regression head
    self.head = Conv2d(decChannels[-1], nbClasses, 1)

  def forward(self, x):

    encFeatures = self.encoder(x)

    '''
    Pass the encoder features through the decoder;
    Make sure the dimensions are compatible for concatenation;
    '''

    decFeatures = self.decoder(encFeatures[::-1][0], encFeatures[::-1][1:])

    # segmentation map
    map = self.head(decFeatures)

    return map

## Training

- Image and mask paths for training
- Dataset, DataLoader, Model, Opt and Loss instances
- Binary mask is necessary to eliminate empty pixel loss from model bias.


In [11]:
# Store the test image path used on eval/test
print("[INFO] saving testing image paths...")
f = open(TEST_PATHS, "w")
f.write("\n".join(imgs_test_path))
f.close()
print("[INFO] testing image paths saved")

[INFO] saving testing image paths...
[INFO] testing image paths saved


In [12]:
transforms = tfms.Compose([tfms.ToTensor(), tfms.Normalize([0.2, 0.17], [0.16, 0.2])])

# Datasets
trainDS = data.SemanticDataset(image_path=imgs_train_path,
                               mask_path=masks_train_path,
                               transform=transforms)

testDS = data.SemanticDataset(image_path=imgs_test_path,
                              mask_path=masks_test_path,
                              transform=transforms)


print(f"[INFO] found {len(trainDS)} examples in the training set...")
print(f"[INFO] found {len(testDS)} examples in the test set...")

# DataLoaders
trainLoader = DataLoader(trainDS,
                         shuffle=True,
                         batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
                         num_workers=os.cpu_count())

testLoader = DataLoader(testDS, 
                        shuffle=False,
                        batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
                        num_workers=os.cpu_count())

[INFO] found 4 examples in the training set...
[INFO] found 4 examples in the test set...


In [13]:
# Checks Dataloader shape
train_features, train_labels = next(iter(trainLoader))
test_features, test_labels = next(iter(testLoader))

print(f"Train feature batch shape: {train_features.size()}")
print(f"Train labels batch shape: {train_labels.size()}")
print(f"Test feature batch shape: {test_features.size()}")
print(f"Test labels batch shape: {test_labels.size()}")

Train feature batch shape: torch.Size([4, 2, 64, 1024])
Train labels batch shape: torch.Size([4, 64, 1024])
Test feature batch shape: torch.Size([4, 2, 64, 1024])
Test labels batch shape: torch.Size([4, 64, 1024])


In [24]:
unet = UNet().to(DEVICE)

# reduction = 'none' in order to return the tensor with every pixel`s loss
lossFunc = CrossEntropyLoss(reduction='none')

# SGD optimizer can also be used instead of Adam
opt = Adam(unet.parameters(), lr=INIT_LR)

trainSteps = len(trainDS) // BATCH_SIZE
testSteps = len(testDS) // BATCH_SIZE

# Training history
H = {"train_loss": [], "test_loss": []}

In [None]:
# TRAINING
print("[INFO] training the network...")
startTime = time.time()

for e in tqdm(range(NUM_EPOCHS)):
	unet.train()

	totalTrainLoss = 0
	totalTestLoss = 0
	for (i, (x, y)) in enumerate(trainLoader):
		(x, y) = (x.to(DEVICE), y.to(DEVICE))

		bin_mask_train = (y !=0).int()
		pred = unet(x)
		loss_train = lossFunc(pred, y)

		filtered_loss_train = loss_train * bin_mask_train
		filtered_loss_train = filtered_loss_train.mean()

		opt.zero_grad()
		filtered_loss_train.backward()
		opt.step()

		totalTrainLoss += filtered_loss_train


	with torch.no_grad():
		unet.eval()
		for (x, y) in testLoader:
			(x, y) = (x.to(DEVICE), y.to(DEVICE))

			bin_mask_test = (y !=0).int()
			pred = unet(x)
			loss_test = lossFunc(pred, y)

			filtered_loss_test = loss_test * bin_mask_test
			filtered_loss_test = filtered_loss_test.mean()

			totalTestLoss +=  filtered_loss_test

	avgTrainLoss = totalTrainLoss / trainSteps
	avgTestLoss = totalTestLoss / testSteps

	# Training history
	H["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
	H["test_loss"].append(avgTestLoss.cpu().detach().numpy())

	# Model training validation info
	print("[INFO] EPOCH: {}/{}".format(e + 1, NUM_EPOCHS))
	print("Train loss: {:.10f}, Test loss: {:.4f}".format(avgTrainLoss, avgTestLoss))

# Total trainning time
endTime = time.time()
print("[INFO] total time taken to train the model: {:.2f}s".format(endTime - startTime))

In [None]:
# Training loss plot

plt.style.use("ggplot")
plt.figure()
plt.plot(H["train_loss"], label="train_loss")
plt.plot(H["test_loss"], label="test_loss")
plt.title("Training Loss on Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend(loc="lower left")
plt.savefig(PLOT_PATH)

# Serialize model into disk
torch.save(unet, MODEL_PATH)

##Prediction
- Clone Cloud2DImageConverter repository for visualization.
- Argmax is used for prediction.
- Top image is ground truth.
- Bottom image is the model prediction.

In [None]:
def make_predictions(model, imagePath):

    model.eval()

    with torch.no_grad():

        image = PILImage.open(imagePath)
        image = np.array(image).astype(np.float32)

        # Ground truth path

        filename = imagePath.split(os.path.sep)[-1]
        groundTruthPath = os.path.join(MASK_TEST_DATASET_PATH, filename)


        gtMask = PILImage.open(groundTruthPath)
        gtMask = np.array(gtMask)

        '''
        Make channel axis to be the leading one;
        Add batch dimension;
        Create pytorch tensor;
        Flash it to current device
        '''
        image = np.transpose(image, (2, 0, 1))
        image = np.expand_dims(image, 0)
        image = torch.from_numpy(image).to(DEVICE)

        # Prediction

        predMask = model(image).squeeze()

        # shape must be (20,64,1024)
        argmax = torch.argmax(predMask, dim=0)

        # Cloud2DImageConverter api to convert index to the corresponding color

        prediction = api.color_matrix(np.array(argmax.cpu()))
        prediction = PILImage.fromarray(prediction)

        gtMask = api.color_matrix(gtMask)
        gtMask = PILImage.fromarray(gtMask)

        # Visualization

        display(gtMask)
        display(prediction)


In [None]:
# Sample selection

print("[INFO] loading up test image paths...")
imagePaths = open("/content/test_paths.txt").read().strip().split("\n")
imagePaths = np.random.choice(imagePaths, size=10)

print("[INFO] load up model...")
unet = torch.load("/content/unet_tgs_salt.pth").to(DEVICE)

for path in imagePaths:
    make_predictions(unet, path)