# T1 SEMANTIC SEGMENTATION

Import and prepare point clouds for semantic segmentation and do the inference.
To run these scripts, create a python 3.10 environment & install geomapi (numpy, opend3d, ifcopenshell, trimesh, ...), pytorch

## SET UP CORRECT FILE PATHS

Please set and check files path based on your data folder structure

In [None]:
import os

root_folder = os.path.abspath('..')
print(root_folder)
data_folder_name = os.path.join(root_folder, 'test_data', 't1_data_fbk')
print(data_folder_name)

## LIBRARIES

In [None]:
#IMPORT PACKAGES
from pathlib import Path
import sys
sys.path.insert(0, root_folder)
sys.path.insert(0, os.path.join(root_folder, "scripts"))
sys.path.insert(0, os.path.join(root_folder, 'thirdparty', 'pointcept'))
print(sys.path)
import numpy as np
import laspy
from geomapi.utils import geometryutils as gmu
import torch

In [None]:
%load_ext autoreload

In [None]:
%autoreload 2

## INPUT DATA CONVERSION

Preprocessing of input data. Convert las/laz files to PyTorch (.pth) files in order to be processed by Pointcept

In [None]:
def handle_process(file_name, output_folder):
    
    print(file_name)

    scene_id = os.path.basename(file_name)

    name, ext = os.path.splitext(scene_id)
    
    if ext not in  [".las", ".laz"]:
        return

    # Read LAS/LAZ
    # populate dict
    las = laspy.read(file_name)
    print(list(las.point_format.dimension_names))

    pcd = gmu.las_to_pcd(las)
    pcd.estimate_normals()
    pcd.orient_normals_to_align_with_direction()
    
    coords = np.stack([las.x, las.y, las.z], axis=1)
    colors = np.stack([las.red / 256, las.green / 256, las.blue / 256], axis=1).astype(np.uint8)
    normals = np.asarray(pcd.normals)
    
    save_dict = dict(coord=coords, color=colors, normal=normals, scene_id=scene_id)

    torch.save(save_dict, os.path.join(output_folder, f"{name}.pth"))

inference_las_folder = os.path.join(data_folder_name, 'input', 'inference')
inference_output_folder = os.path.join(data_folder_name, 'inference')

os.makedirs(inference_output_folder, exist_ok=True)

for file_name in os.listdir(inference_las_folder):
    handle_process(os.path.join(inference_las_folder, file_name), inference_output_folder)

## INFERENCE

Inference using Pointcept (Point Transformer V3)

In [None]:
from pointcept.engines.defaults import (
    default_argument_parser,
    default_config_parser,
    default_setup,
)
from pointcept.engines.test import TESTERS
from pointcept.engines.launch import launch
from pointcept.datasets import build_dataset, collate_fn
import pointcept.utils.comm as comm
import torch
import os
from pointcept.engines.defaults import create_ddp_model
from collections import OrderedDict
import time
import numpy as np
from pointcept.utils.misc import make_dirs
import torch.nn.functional as F
from pointcept.models import build_model

def collate_fn(batch):
    return batch

def build_inference_model(cfg):
    model = build_model(cfg.model)
    n_parameters = sum(p.numel() for p in model.parameters() if p.requires_grad)
    model = create_ddp_model(
        model.cuda(),
        broadcast_buffers=False,
        find_unused_parameters=cfg.find_unused_parameters,
    )
    if os.path.isfile(cfg.weight):
        checkpoint = torch.load(cfg.weight)
        weight = OrderedDict()
        for key, value in checkpoint["state_dict"].items():
            if key.startswith("module."):
                if comm.get_world_size() == 1:
                    key = key[7:]  # module.xxx.xxx -> xxx.xxx
            else:
                if comm.get_world_size() > 1:
                    key = "module." + key  # xxx.xxx -> module.xxx.xxx
            weight[key] = value
        model.load_state_dict(weight, strict=True)

    else:
        raise RuntimeError("=> No checkpoint found at '{}'".format(cfg.weight))
    return model

def do_inference(cfg):    
    cfg = default_setup(cfg)
    test_dataset = build_dataset(cfg.data.test)
    
    if comm.get_world_size() > 1:
        test_sampler = torch.utils.data.distributed.DistributedSampler(test_dataset)
    else:
        test_sampler = None
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=cfg.batch_size_test_per_gpu,
        shuffle=False,
        num_workers=cfg.batch_size_test_per_gpu,
        pin_memory=True,
        sampler=test_sampler,
        collate_fn=collate_fn,
    )
    
    model = build_inference_model(cfg)
    model.eval()

    save_path = os.path.join(cfg.save_path, "result")
    make_dirs(save_path)
    
    for idx, data_dict in enumerate(test_loader):
        data_dict = data_dict[0]  # current assume batch size is 1
        fragment_list = data_dict.pop("fragment_list")
        segment = data_dict.pop("segment")
        data_name = data_dict.pop("name")
        pred_save_path = os.path.join(save_path, "{}_pred.npy".format(data_name))

        pred = torch.zeros((segment.size, cfg.data.num_classes)).cuda()
        for i in range(len(fragment_list)):
            fragment_batch_size = 1
            s_i, e_i = i * fragment_batch_size, min(
                (i + 1) * fragment_batch_size, len(fragment_list)
            )
            input_dict = fragment_list[s_i:e_i][0] #collate_fn(fragment_list[s_i:e_i])[0]
            for key in input_dict.keys():
                if isinstance(input_dict[key], torch.Tensor):
                    input_dict[key] = input_dict[key].cuda(non_blocking=True)
            idx_part = input_dict["index"]            
            with torch.no_grad():
                pred_part = model(input_dict)["seg_logits"]  # (n, k)
                pred_part = F.softmax(pred_part, -1)
                if cfg.empty_cache:
                    torch.cuda.empty_cache()
                bs = 0                
                for be in input_dict["offset"]:
                    pred[idx_part[bs:be], :] += pred_part[bs:be]
                    bs = be        
        pred = pred.max(1)[1].data.cpu().numpy()
        np.save(pred_save_path, pred)

    print("DONE.")

config_path = os.path.join(data_folder_name, 'config.py')
save_path = data_folder_name
weights = os.path.join(data_folder_name, 'model', 'model_best.pth')

cfg = default_config_parser(str(config_path), {'save_path': str(save_path), 'weight': str(weights), 'data_root': str(save_path)})

do_inference(cfg)


## RESULTS

Convert inference result back to laz and remap the classes to the correct ids

In [None]:
import json

classes_file = os.path.join(data_folder_name, '..', '_classes.json')

# Read the JSON file
with open(classes_file, 'r') as file:
    json_data = json.load(file)

classes_list = json_data['classes']

print(classes_list)

remapped_classes_ids = {}

for class_entry in classes_list:
    remapped_classes_ids[int(class_entry["temp_id"])] = int(class_entry["id"])

print(remapped_classes_ids)

In [None]:
import numpy as np

def process_result(las_file_path, predictions_folder, output_folder):
    
    las = laspy.read(las_file_path)

    dimensions = list(las.header.point_format.dimension_names)

    if not "classes" in dimensions:

        las.add_extra_dim(laspy.ExtraBytesParams(
            name="classes",
            type=np.int32
        ))

    file_name, _ = os.path.splitext(os.path.basename(las_file_path))

    prediction_file_path = os.path.join(predictions_folder, f"{file_name}_pred.npy")

    if not os.path.exists(prediction_file_path):
        return

    labels = np.load(prediction_file_path)

    classes = []

    for label in labels:
        classes.append(remapped_classes_ids[label])

    las["classes"] = np.array(classes).astype(np.uint8)

    las.write(os.path.join(output_folder, f"{file_name}_pred.laz"))


inference_las_folder = os.path.join(data_folder_name, 'input', 'inference')
predictions_folder = os.path.join(data_folder_name, 'result')
output_folder =  os.path.join(data_folder_name, 'result')

for file_name in os.listdir(inference_las_folder):
    process_result(os.path.join(inference_las_folder, file_name), predictions_folder, output_folder)