In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from glob import glob
import os
import json
from collections import defaultdict
import numpy as np
from typing import Dict, List
import re
import matplotlib.pyplot as plt

In [None]:
LABELS_POINT_CLOUDS_FOLDERNAME = "labels_point_clouds"
CLASSES = [
    "lidar__cuboid__person",
    "lidar__cuboid__signal",
    "lidar__cuboid__catenary_pole",
    "lidar__cuboid__signal_pole",
    # "lidar__cuboid__train",
    "lidar__cuboid__road_vehicle",
    "lidar__cuboid__buffer_stop",
    "lidar__cuboid__animal",
    # "lidar__cuboid__switch",
    # "lidar__cuboid__bicycle",
    # "lidar__cuboid__crowd",
    # "lidar__cuboid__wagons",
    # "lidar__cuboid__signal_bridge",
]
CLASS_COLORS = {
    "lidar__cuboid__person": [0.91372549, 0.462745098, 0.976470588],
    "lidar__cuboid__bicycle": [0.694117647, 0.549019608, 1],
    "lidar__cuboid__signal": [0, 0.8, 0.964705882],
    "lidar__cuboid__catenary_pole": [0.337254902, 1, 0.71372549],
    "lidar__cuboid__buffer_stop": [0.352941176, 1, 0.494117647],
    "lidar__cuboid__train": [0.921568627, 0.811764706, 0.211764706],
    "lidar__cuboid__road_vehicle": [0.4, 0.419607843, 0.980392157],
    "lidar__cuboid__signal_pole": [0.725490196, 0.643137255, 0.329411765],
    "lidar__cuboid__animal": [0.780392157, 0.780392157, 0.780392157],
    "lidar__cuboid__switch": [0.850980392, 0.541176471, 0.525490196],
    "lidar__cuboid__crowd": [0.97647059, 0.43529412, 0.36470588],
    "lidar__cuboid__wagons": [0.98431373, 0.94901961, 0.75294118],
    "lidar__cuboid__signal_bridge": [0.42745098, 0.27058824, 0.29803922],
}
OCCLUSIONS = [
    "0-25 %",
    "25-50 %",
    "50-75 %",
    "75-99 %",
    "100 %",
]
BLACKLISTED_SEQUENCE_NAMES = ["4_station_pedestrian_bridge_4.4", "19_vegetation_curve_19.1"]

In [None]:
def natural_key(string_):
    return [int(s) if s.isdigit() else s for s in re.split(r"(\d+)", string_) if s]


def plot_class_distance_hist_and_coords(
    class_distances: Dict[str, List[int]],
    class_coords: Dict[str, List[int]],
    class_count: int,
    point_cloud_range: List[float] = [0, -100, -10, 400, 100, 10],
):
    fig, axs = plt.subplots(class_count, 2, figsize=(10, 50))
    axs = axs.flatten()
    for i, (cls, distances) in enumerate(class_distances.items()):
        distances = np.array(distances)
        class_coords[cls] = np.array(class_coords[cls])
        idx = i * 2
        axs[idx].set_title(cls[15:])
        axs[idx].hist(
            distances, bins=50, range=[0, point_cloud_range[3]], label=cls, alpha=0.5, color=CLASS_COLORS[cls]
        )
        axs[idx + 1].set_title(cls[15:])
        axs[idx + 1].set_xlim([point_cloud_range[0], point_cloud_range[3]])
        axs[idx + 1].set_ylim([point_cloud_range[1], point_cloud_range[4]])
        if class_coords[cls].shape[0] > 0:
            axs[idx + 1].scatter(
                class_coords[cls][:, 0],
                class_coords[cls][:, 1],
                s=0.2,
                label=cls,
                alpha=1,
                color=CLASS_COLORS[cls],
            )
        else:
            axs[idx + 1].scatter([], [], s=0.2, label=cls, alpha=1)
    plt.show()

def plot_class_num_points_hist(
    class_num_points: Dict[str, List[int]],
    class_count: int,
):
    fig, axs = plt.subplots(class_count, 1, figsize=(10, 50))
    axs = axs.flatten()
    for i, (cls, distances) in enumerate(class_num_points.items()):
        distances = np.array(distances)
        axs[i].set_title(cls[15:])
        axs[i].hist(
            distances, bins=50, label=cls, alpha=0.5, color=CLASS_COLORS[cls]
        )
    plt.show()


def print_class_counts(data):
    seq_class_dists = [x["classes"] for x in data.values()]
    class_dist = {x: 0 for x in CLASSES}
    for x in seq_class_dists:
        for obj_id, obj_count in x.items():
            class_dist[obj_id] += obj_count
    class_dist = dict(sorted(class_dist.items(), key=lambda item: item[1], reverse=True))
    print(f"{'class':<15}: {'count':<5} (% of total)")
    print("=" * 35)
    total = sum(class_dist.values())
    for k, v in class_dist.items():
        print(f"{k[15:]:<15}: {v:<7} ({v/total*100:.2f}%)")
    print("=" * 35)
    print(f"{'Total':<15}: {total:<7} (100%)")


def print_occlusion(data):
    class_dist = {diff: {x: 0 for x in CLASSES} for diff in OCCLUSIONS}
    for seq, d in data.items():
        for occl, class_details in d["occlusions"].items():
            for cls, count in class_details.items():
                class_dist[occl][cls] += count

    print("Occlusion stats")
    title = f"{'Class':<20} " + "".join([f"{x:<15}" for x in OCCLUSIONS])
    occlusion_totals = {x: sum(class_dist[x].values()) for x in OCCLUSIONS}
    class_totals = {x: sum([class_dist[occl][x] for occl in OCCLUSIONS]) for x in CLASSES}
    print(title)
    print("=" * len(title))
    for cls in CLASSES:
        # check if cls has any instances
        if class_totals[cls] == 0:
            class_totals[cls] = 1
        cls_info = [
            f"{class_dist[occl][cls]} ({class_dist[occl][cls]/class_totals[cls]:.2f}%)"
            for occl in OCCLUSIONS
        ]
        print(f"{cls[15:]:<20} " + "".join([f"{x:<15}" for x in cls_info]))
    # print total
    print("=" * len(title))
    cls_info = [
        f"{occlusion_totals[occl]} ({occlusion_totals[occl]/sum(occlusion_totals.values()):.2f}%)"
        for occl in OCCLUSIONS
    ]
    print(f"{'Total':<20} " + "".join([f"{x:<15}" for x in cls_info]))


def print_distance_stats(class_distances: Dict[str, List[int]]):
    # fmt: off
    print("Distance stats")
    print("=" * 100)
    for cls, distances in class_distances.items():
        print(f" {cls[15:]:<20} count: {len(distances):<10} min: {np.min(distances):<10.3f} mean: {np.mean(distances):<10.3f} max: {np.max(distances):<10.3f} std: {np.std(distances):<10.2f}")
    print("=" * 100)
    print(f" {'Total':<20} count: {np.sum([len(x) for x in class_distances.values()]):<10} min: {np.min([np.min(x) for x in class_distances.values()]):<10.3f} mean: {np.mean([np.mean(x) for x in class_distances.values()]):<10.3f} max: {np.max([np.max(x) for x in class_distances.values()]):<10.3f} std: {np.std([np.std(x) for x in class_distances.values()]):<10.2f}")
    # fmt: on


def print_num_points_stats(class_num_points: Dict[str, List[int]]):
    # fmt: off
    print("Num points stats")
    print("=" * 100)
    for cls, distances in class_num_points.items():
        print(f" {cls[15:]:<20} count: {len(distances):<10} min: {np.min(distances):<10.3f} mean: {np.mean(distances):<10.3f} max: {np.max(distances):<10.3f} std: {np.std(distances):<10.2f}")
    print("=" * 100)
    print(f" {'Total':<20} count: {np.sum([len(x) for x in class_num_points.values()]):<10} min: {np.min([np.min(x) for x in class_num_points.values()]):<10.3f} mean: {np.mean([np.mean(x) for x in class_num_points.values()]):<10.3f} max: {np.max([np.max(x) for x in class_num_points.values()]):<10.3f} std: {np.std([np.std(x) for x in class_num_points.values()]):<10.2f}")
    # fmt: on

def print_coords_stats(class_coords: Dict[str, List[int]]):
    # fmt: off
    print(f" {'class':<20} {'x-min':<10} {'x-max':<10} {'y-min':<10} {'y-max':<10} {'z-min':<10} {'z-max':<10}")
    print("=" * 82)
    for cls, coords in class_coords.items():
        coords = np.array(coords)
        print(f" {cls[15:]:<20} {coords[:, 0].min():<10.2f} {coords[:, 0].max():<10.2f} {coords[:, 1].min():<10.2f} {coords[:, 1].max():<10.2f} {coords[:, 2].min():<10.2f} {coords[:, 2].max():<10.2f}")
    print("=" * 82)
    coords = np.concatenate([np.array(x) for x in class_coords.values()])
    print(f" {'Total':<20} {coords[:, 0].min():<10.2f} {coords[:, 0].max():<10.2f} {coords[:, 1].min():<10.2f} {coords[:, 1].max():<10.2f} {coords[:, 2].min():<10.2f} {coords[:, 2].max():<10.2f}")
    # fmt: on

def print_frame_class_stats(data, classes):
    title = f"{'seq_name':<40} {'no_frames':<10}"
    for cls in classes:
        title += f" {cls[15:]:<15}"
    print(title)
    print("=" * len(title))
    for seq_name, d in data.items():
        seq_name = seq_name[:40]
        seq_name += " " * (40 - len(seq_name))
        total_frames = d['no_frames']
        seq_info = f"{seq_name:<40} {total_frames:<10}"
        for cls in classes:
            seq_info += f" {d['classes'][cls]:<15}"
        print(seq_info)
    total = {x: 0 for x in classes}
    for d in data.values():
        for cls in classes:
            total[cls] += d['classes'][cls]

    print("=" * len(title))
    total_frames = sum([d['no_frames'] for d in data.values()])
    total_info = f"{'Total':<40} {total_frames:<10}"
    for cls in classes:
        total_info += f" {total[cls]:<15}"
    print(total_info)

def print_frame_stats(data):
    print(f"{'seq_name':<40} {'no_frames':<10} {'no_classes':<10} {'no_bbox':<10} {'no_bbox/frame':<10}")
    print("=" * 90)
    for seq_name, d in data.items():
        total_frames = d['no_frames']
        total_classes = len(d['classes'])
        total_all_classes = sum([v for v in d['classes'].values()])
        avg_classes = total_all_classes / total_frames
        print(f"{seq_name:<40} {total_frames:<10} {total_classes:<10} {total_all_classes:<10} {avg_classes:<10.1f}")
                    
    # total
    total_frames = sum([d['no_frames'] for d in data.values()])
    total_classes = sum([len(d['classes']) for d in data.values()])
    total_all_classes = sum([sum([v for v in d['classes'].values()]) for d in data.values()])
    avg_classes = total_all_classes / total_frames
    print("=" * 90)
    print(f"{'Total':<40} {total_frames:<10} {'':<10} {total_all_classes:<10} {avg_classes:<10.1f}")

In [None]:
def create_data(data_root_path:str, point_cloud_range:List[float] = [], classes:List[str] = [], blacklist:List[str]=[]) -> Dict[str, dict]:
    data = defaultdict(dict)
    
    seq_paths = sorted(glob(os.path.join(data_root_path, "*")), key=natural_key)
    for seq_path in seq_paths:
        data_classes = defaultdict(int)
        data_occlusions = defaultdict(dict)
        data_difficulties = defaultdict(dict)
        data_no_frames = 0
        data_num_points = defaultdict(list)
        data_distances = defaultdict(list)
        data_coords = defaultdict(list)

        if not os.path.isdir(seq_path):
            print(f"{seq_path} is not a directory. Skipping...")
            continue

        seq_name = os.path.basename(seq_path)
        if seq_name in blacklist:
            continue
        
        labels_path = os.path.join(seq_path, LABELS_POINT_CLOUDS_FOLDERNAME)

        for x in glob(os.path.join(labels_path, "*")):
            if not os.path.exists(x):
                print(f"{x} does not exist. Skipping...")
                continue

            with open(x, "r") as f:
                main_labels = json.load(f)

            for fi, _ in main_labels["openlabel"]["frames"].items():
                data_no_frames += 1
                for _, obj in main_labels["openlabel"]["frames"][fi]["objects"].items():
                    obj_data = obj["object_data"]
                    cuboid = obj_data["cuboid"][0]
                    class_name = cuboid["name"]

                    if classes and class_name not in classes:
                        continue

                    loc = np.asarray(cuboid["val"][:3], dtype=np.float32)
                    if len(point_cloud_range) != 0 and not (
                        loc[0] > point_cloud_range[0]
                        and loc[0] < point_cloud_range[3]
                        and loc[1] > point_cloud_range[1]
                        and loc[1] < point_cloud_range[4]
                        and loc[2] > point_cloud_range[2]
                        and loc[2] < point_cloud_range[5]
                    ):
                        continue
                    
                    data_coords[class_name].append(loc)
                    data_classes[class_name] += 1

                    for x in cuboid["attributes"]["text"]:
                        if x["name"] == "occlusion":
                            occlusion_level = x["val"]
                            data_occlusions[occlusion_level][class_name] = data_occlusions[occlusion_level].get(class_name, 0) + 1
                        elif x["name"] == "difficulty":
                            difficulty = x["val"]
                            data_difficulties[difficulty][class_name] = data_difficulties[difficulty].get(class_name, 0) + 1

                    for x in cuboid["attributes"]["num"]:
                        if x["name"] == "num_points":
                            num_points = x["val"]
                            data_num_points[class_name].append(num_points)
                        elif x["name"] == "distance":
                            distance = x["val"]
                            data_distances[class_name].append(distance)

        data[seq_name]["classes"] = data_classes
        data[seq_name]["no_frames"] = data_no_frames
        data[seq_name]["occlusions"] = data_occlusions
        data[seq_name]["difficulties"] = data_difficulties
        data[seq_name]["num_points"] = data_num_points
        data[seq_name]["distances"] = data_distances
        data[seq_name]["coords"] = data_coords
    
    return data

In [None]:
data_path = "../data/osdar23_original/"
point_cloud_range = [-6.0, -128.0, -3.0, 250.0, 128.0, 13.0]
data = create_data(data_path, point_cloud_range, CLASSES, blacklist=BLACKLISTED_SEQUENCE_NAMES)

class_num_points_dist = {x : [] for x in CLASSES}
for seq, d in data.items():
    for obj_id, num_points in d["num_points"].items():
        class_num_points_dist[obj_id].extend(num_points)

class_distance_dist = {x : [] for x in CLASSES}
for seq, d in data.items():
    for obj_id, distance in d["distances"].items():
        class_distance_dist[obj_id].extend(distance)

class_coords = {x : [] for x in CLASSES}
for seq, d in data.items():
    for obj_id, coords in d["coords"].items():
        class_coords[obj_id].extend(coords)

In [None]:
print_frame_stats(data)

In [None]:
print_frame_class_stats(data, CLASSES)

In [None]:
print_coords_stats(class_coords)

In [None]:
print_class_counts(data)

In [None]:
print_distance_stats(class_distance_dist)

In [None]:
print_num_points_stats(class_num_points_dist)

In [None]:
print_occlusion(data)

In [None]:
plot_class_distance_hist_and_coords(class_distance_dist, class_coords, len(CLASSES), point_cloud_range)

In [None]:
plot_class_num_points_hist(class_num_points_dist, len(CLASSES))