In [None]:
import os
import pprint
import time
from pathlib import Path

import numpy as np

# set paths
home_path = str(Path.home())
base_path = home_path + "/dev/Open3D-ML"
dateset_path = home_path + "/datasets/SmartLab"
ckpt_base_path = base_path + "/mytests/logs"

# import custom open3d.ml
os.environ["OPEN3D_ML_ROOT"] = base_path
import open3d.ml as _ml3d

# other dataset configs
randlanet_semantickitti_cfg = base_path + "/ml3d/configs/randlanet_semantickitti.yml"
randlanet_s3dis_cfg = base_path + "/ml3d/configs/randlanet_s3dis.yml"
# smart lab dataset config
randlanet_smartlab_cfg = base_path + "/ml3d/configs/randlanet_smartlab.yml"
kpconv_smartlab_cfg = base_path + "/ml3d/configs/kpconv_smartlab.yml"

# checkpoints
kpconv_ckpt_path = ckpt_base_path + "/KPFCNN_SmartLab_tf/checkpoint/ckpt-11"
randlanet_ckpt_path = ckpt_base_path + "/RandLANet_SmartLab_tf/checkpoint/ckpt-6"

kwargs = {
    "framework": "tf",
    "device": "cuda",
    "dataset_path": dateset_path,
    "split": "test",
    "ckpt_path": randlanet_ckpt_path,
    "cfg_file": randlanet_smartlab_cfg,
}

# on-the-fly object using kwargs
args = type("args", (object,), kwargs)()
pprint.pprint(kwargs)

# import tensorflow or pytorch
if args.framework == "torch":
    import open3d.ml.torch as ml3d
else:
    import open3d.ml.tf as ml3d
    import tensorflow as tf

gpus = tf.config.experimental.list_physical_devices("GPU")
pprint.pprint(gpus)

# TF GPU settings
if gpus is not None and args.framework == "tf":
    print("Setting up GPUs for TF")
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        if args.device == "cpu":
            tf.config.set_visible_devices([], "GPU")
        elif args.device == "cuda":
            tf.config.set_visible_devices(gpus[0], "GPU")
        else:
            idx = args.device.split(":")[1]
            tf.config.set_visible_devices(gpus[int(idx)], "GPU")
    except RuntimeError as e:
        print(e)

# merge args into config file
def merge_cfg_file(cfg, args, extra_dict):
    if args.device is not None:
        cfg.pipeline.device = args.device
        cfg.model.device = args.device
    if args.split is not None:
        cfg.pipeline.split = args.split
    if args.dataset_path is not None:
        cfg.dataset.dataset_path = args.dataset_path
    if args.ckpt_path is not None:
        cfg.model.ckpt_path = args.ckpt_path

    return cfg.dataset, cfg.pipeline, cfg.model


cfg = _ml3d.utils.Config.load_from_file(args.cfg_file)
cfg_dataset, cfg_pipeline, cfg_model = merge_cfg_file(cfg, args, None)

# cfg.dataset["dataset_path"] = args.dataset_path

Pipeline = _ml3d.utils.get_module("pipeline", cfg.pipeline.name, args.framework)
Model = _ml3d.utils.get_module("model", cfg.model.name, args.framework)
Dataset = _ml3d.utils.get_module("dataset", cfg.dataset.name)

dataset = Dataset(**cfg_dataset)
model = Model(**cfg_model)
pipeline = Pipeline(model, dataset, **cfg_pipeline)

smartlab_label_colors = {
    "Unlabeled": [x / 255.0 for x in [231, 87, 36]],
    "Floor": [x / 255.0 for x in [188, 169, 26]],
    "Wall": [x / 255.0 for x in [100, 244, 245]],
    "Robot": [x / 255.0 for x in [150, 30, 140]],
    "Human": [x / 255.0 for x in [0, 248, 26]],
    "AGV": [x / 255.0 for x in [18, 35, 243]],
}

# pprint.pprint(cfg_dataset)
# pprint.pprint(cfg_model)
# pprint.pprint(cfg_pipeline)

pipeline.cfg_tb = {
    "readme": "readme",
    "cmd_line": "cmd_line",
    "dataset": pprint.pformat(cfg_dataset, indent=2),
    "model": pprint.pformat(cfg_model, indent=2),
    "pipeline": pprint.pformat(cfg_pipeline, indent=2),
}
# pprint.pprint(pipeline.cfg_tb)

In [None]:
# train or test
if args.split == "test":
    pipeline.run_test(tqdm_disable=True)  # tqdm_disable=True
else:
    pipeline.run_train()

In [None]:
# visualize dataset
vis = ml3d.vis.Visualizer()
lut = ml3d.vis.LabelLUT()

pprint.pprint(smartlab_label_colors)
i = 0
for key, val in smartlab_label_colors.items():
    lut.add_label(key, i, val)
    i = i + 1

# vis.visualize_dataset(dataset, "train", width=2100, height=1600)  # , indices=range(100)
vis.visualize_dataset(dataset, "train", lut, width=2100, height=1600)  # , indices=range(100)

In [None]:
# measure accuracy and performance of inferencing and prepare for visual comparison
pipeline.load_ckpt(ckpt_path=args.ckpt_path)
test_split = dataset.get_split("test")

vis_points = []
times = []
acc = []

for idx in range(len(test_split)):
    # for idx in range(5):

    attr = test_split.get_attr(idx)
    data = test_split.get_data(idx)
    print(attr)

    st = time.perf_counter()
    results = pipeline.run_inference(data, tqdm_disable=True)
    et = time.perf_counter()

    pred = (results["predict_labels"]).astype(np.int32)
    # scores = results['predict_scores']

    label = data["label"]
    pts = data["point"]

    vis_d = {
        "name": attr["name"],
        "points": pts,
        "labels": label,
        "pred": pred,
    }

    correct = (pred == label).sum()
    accuracy = correct / len(label)

    print(f"Correct: " + str(correct) + " out of " + str(len(label)))
    print(f"Accuracy: " + str(accuracy))

    vis_points.append(vis_d)
    times.append(et - st)
    acc.append(accuracy)

print("\n")
# print(times)
print(f"Average time {np.mean(times):0.4f} seconds")
overall_acc = np.asarray(acc).sum() / len(acc)
print("Overall Accuracy: " + str(overall_acc))

In [None]:
# visualize results
v = ml3d.vis.Visualizer()
lut = ml3d.vis.LabelLUT()

label_names = dataset.get_label_to_names()
pprint.pprint(label_names)

for (c, cv), (l, lv) in zip(smartlab_label_colors.items(), label_names.items()):
    lut.add_label(lv, l, cv)
# for key, val in dataset.label_to_names.items():
#     lut.add_label(val, key)

v.set_lut("labels", lut)
v.set_lut("pred", lut)
v.visualize(vis_points, lut, width=2600, height=2000)