# HGP clusterer sur SemanticKITTI (local, sans post-traitement)

Ce notebook :

- installe les dépendances système et Python nécessaires,
- récupère **hgp_clusterer** depuis votre GitHub,
- lance **HypergraphPercol** localement sur SemanticKITTI,
- applique les mêmes *pré-traitements* que vos notebooks (`none`, `bev_xy`, `bev_xyzi`, `polar`),
- **sans aucun post-traitement** (pas de KNN, pas de merges, pas de box splitting),
- écrit les fichiers `.label` panoptiques au format SemanticKITTI,
- propose une visualisation 3D simple d'un scan.

In [None]:
import sys

# Colab: monter Drive si dispo
if "google.colab" in sys.modules:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
print("OK setup.")


In [None]:
# @title 0) Dépendances système (CGAL, TBB, etc.)
%%bash
set -euo pipefail
apt-get update -qq
apt-get install -y -qq build-essential cmake libcgal-dev libtbb-dev libtbbmalloc2 \
  libgmp-dev libmpfr-dev libeigen3-dev

In [None]:
# @title 0-bis) Dépendances Python
!pip -q install --upgrade pip setuptools wheel Cython cmake jedi
!pip -q install numpy scipy scikit-learn plotly tqdm joblib

In [None]:
%%bash
set -euo pipefail
WORKDIR="${HGP_WORKDIR:-/content}"
mkdir -p "${WORKDIR}"
cd "${WORKDIR}"
if [ -d HGP-clusterer ]; then
    git -C HGP-clusterer pull --ff-only
else
    git clone https://github.com/Ludwig-H/HGP-clusterer.git
fi
if [ -d cyminiball ]; then
    git -C cyminiball pull --ff-only
else
    git clone https://github.com/Ludwig-H/cyminiball.git
fi


In [None]:
%%bash
set -euo pipefail
WORKDIR="${HGP_WORKDIR:-/content}"
mkdir -p "${WORKDIR}/wheels"
cd "${WORKDIR}/cyminiball"
python3 -m pip wheel --no-build-isolation --no-deps --wheel-dir="${WORKDIR}/wheels" .
python3 -m pip install --force-reinstall --no-deps --no-index --find-links="${WORKDIR}/wheels" cyminiball
# Le "--no-deps" indispensable pour que numpy ne se télécharge pas en version 2.3.4, créant des problèmes de compatibilité...


In [None]:
%%bash
set -euo pipefail
WORKDIR="${HGP_WORKDIR:-/content}"
cd "${WORKDIR}/HGP-clusterer"
python3 scripts/setup_cgal.py


In [None]:
%%bash
set -euo pipefail
WORKDIR="${HGP_WORKDIR:-/content}"
cd "${WORKDIR}/HGP-clusterer/CGALDelaunay"

projects=(
    EdgesCGALDelaunay2D
    EdgesCGALDelaunay3D
    EdgesCGALDelaunayND
    EdgesCGALWeightedDelaunay2D
    EdgesCGALWeightedDelaunay3D
    EdgesCGALWeightedDelaunayND
)

for project in "${projects[@]}"; do
    cmake -S "${project}" -B "${project}/build" -DCMAKE_BUILD_TYPE=Release
    cmake --build "${project}/build" --config Release
    cmake --install "${project}/build" --prefix "${WORKDIR}/HGP-clusterer"
done


In [None]:
%%bash
set -euo pipefail
WORKDIR="${HGP_WORKDIR:-/content}"
cd "${WORKDIR}/HGP-clusterer"
python3 -m pip install -v --no-deps .


In [None]:
import os

workdir = os.environ.get("HGP_WORKDIR", "/content")
repo_root = os.path.join(workdir, "HGP-clusterer")
os.environ["CGALDELAUNAY_ROOT"] = os.path.join(repo_root, "CGALDelaunay")

from hgp_clusterer import HypergraphPercol

In [None]:
# @title 1) Récupération et installation de hgp_clusterer (GitHub)
import os, subprocess, sys
from pathlib import Path

# === Paramètre clé : URL du dépôt Git de hgp_clusterer ===
# Remplacez par l'URL de votre dépôt si besoin.
REPO_URL = os.environ.get("HGP_REPO_URL", "https://github.com/Ludwig-H/HGP-clusterer.git")

WORKDIR = Path(os.environ.get("HGP_WORKDIR", "/content")).resolve()
WORKDIR.mkdir(parents=True, exist_ok=True)

repo_dir = WORKDIR / "HGP-clusterer"
cymini_dir = WORKDIR / "cyminiball"

def _run(cmd, **kw):
    print("+", cmd)
    subprocess.run(cmd, check=True, **kw)

# Clone / update hgp_clusterer
if repo_dir.exists():
    _run(["git", "-C", str(repo_dir), "pull", "--ff-only"])
else:
    _run(["git", "clone", REPO_URL, str(repo_dir)])

# cyminiball (optionnel mais recommandé pour certains backends)
if cymini_dir.exists():
    _run(["git", "-C", str(cymini_dir), "pull", "--ff-only"])
else:
    _run(["git", "clone", "https://github.com/Ludwig-H/cyminiball.git", str(cymini_dir)])

wheels = WORKDIR / "wheels"
wheels.mkdir(parents=True, exist_ok=True)
_run([sys.executable, "-m", "pip", "wheel", "--no-build-isolation", "--no-deps", "--wheel-dir", str(wheels), str(cymini_dir)])
_run([sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "--no-index", f"--find-links={wheels}", "cyminiball"])

# Installation de hgp_clusterer lui-même (laisse le setup gérer les extensions)
_run([sys.executable, "-m", "pip", "install", "-e", str(repo_dir)])

# Variable d'environnement pour les exécutables CGALDelaunay éventuels
cgal_root = repo_dir / "CGALDelaunay"
os.environ["CGALDELAUNAY_ROOT"] = str(cgal_root)
print("CGALDELAUNAY_ROOT =", os.environ["CGALDELAUNAY_ROOT"])
print("Dépôt hgp_clusterer prêt:", repo_dir)

In [None]:
import os
import sys

workdir = os.environ.get("HGP_WORKDIR", "/content")
repo_root = os.path.join(workdir, "HGP-clusterer")

# Add repo_root to sys.path to ensure subprocesses can find hgp_clusterer
if str(repo_root) not in sys.path:
    sys.path.insert(0, str(repo_root))

os.environ["CGALDELAUNAY_ROOT"] = os.path.join(repo_root, "CGALDelaunay")

from hgp_clusterer import HypergraphPercol

In [None]:
# @title 2) Imports + configuration centrale

import os, json
import numpy as np
from pathlib import Path
from typing import Optional

# from hgp_clusterer import HypergraphPercol



# === Dossiers ===
DATA_ROOT = "/content/drive/MyDrive/Datasets/semantic_kitti"  #@param {type:"string"}
# Sémantique pour le clustering: "oracle" (vérité terrain), "waffleiron" (preds), "custom" (vos propres prédictions)
SEMANTICS_SOURCE = "oracle"  #@param ["oracle", "waffleiron", "custom"]
# Si "waffleiron" ou "custom": racine contenant sequences/*/predictions/*.{label,labels,npy,npz}
PRED_ROOT = "/content/drive/MyDrive/Datasets/semantic_kitti/WaffleIron"  #@param {type:"string"}

# === Séquences à traiter ===
SEQUENCES = ["08"]  #@param {type:"raw"}

# === Paramètres HGP ===
K = 2  #@param {type:"integer"}
min_cluster_size = 2  #@param {type:"integer"}
min_samples = K + 1    #@param {type:"raw"}
method = "DBSCAN"         #@param ["eom","leaf", "DBSCAN"]
splitting = True      #@param {type:"boolean"}
weight_face = "lambda" #@param ["lambda","uniform","unique"]
label_all_points = False  #@param {type:"boolean"}
return_multi_clusters = False  #@param {type:"boolean"}
complex_chosen = "orderk_delaunay"  #@param ["orderk_delaunay","delaunay","weighted_delaunay"]
expZ = 1  #@param {type:"integer"}
HGP_VERBOSE = False  #@param {type:"boolean"}

# === Pré-traitements (sans post-processing) ===
PREPROC = "none"  #@param ["none","bev_xy","bev_xyzi","polar"]

# === Sortie: structure SemanticKITTI ===
RUN_NAME = f"HGP-K{K}_min{min_cluster_size}_expZ{expZ}_pre{PREPROC}_method{method}"
OUT_ROOT = f"/content/drive/MyDrive/Datasets/semantic_kitti/experiments_semkitti/{RUN_NAME}"
Path(OUT_ROOT).mkdir(parents=True, exist_ok=True)
print("Sortie:", OUT_ROOT)

# === Classes thing/stuff brutes SemanticKITTI ===
THING_RAW_IDS = [10,11,15,18,20,30,31,32]  # car, bicycle, motorcycle, truck, other-vehicle, person, bicyclist, motorcyclist
STUFF_RAW_IDS = [40,44,48,49,50,51,70,71,72,80,81]

print("Config chargée.")

In [None]:
# @title 3) Utilitaires I/O (KITTI) + encodage panoptique
import numpy as np
from pathlib import Path

def kitti_scan_paths(seq_dir: Path):
    vel_dir = seq_dir / "velodyne"
    label_dir = seq_dir / "labels"
    assert vel_dir.is_dir(), f"Manque velodyne sous {seq_dir}"
    stems = sorted([p.stem for p in vel_dir.glob("*.bin")])
    return vel_dir, label_dir, stems

def read_points_bin(bin_path: Path) -> np.ndarray:
    arr = np.fromfile(str(bin_path), dtype=np.float32)
    return arr.reshape(-1, 4)  # x,y,z,remission

def read_label_file(label_path: Path) -> np.ndarray:
    # 32-bit uint: upper 16 bits = instance id, lower 16 = semantic id
    return np.fromfile(str(label_path), dtype=np.uint32)

def pack_panoptic(semantic: np.ndarray, instance: np.ndarray) -> np.ndarray:
    assert semantic.shape == instance.shape
    return ((instance.astype(np.uint32) << 16) | (semantic.astype(np.uint32)))

def unpack_semantic(label32: np.ndarray) -> np.ndarray:
    return (label32 & 0xFFFF).astype(np.uint16)

def _find_pred_file(pred_dir: Path, stem: str) -> Optional[Path]:
    # Supporte .label, .labels, .npy, .npz
    for ext in (".label", ".labels", ".npy", ".npz"):
        p = pred_dir / f"{stem}{ext}"
        if p.is_file():
            return p
    return None

def _load_semantics_generic(pred_path: Path) -> np.ndarray:
    if pred_path.suffix in (".label", ".labels"):
        u32 = np.fromfile(str(pred_path), dtype=np.uint32)
        return (u32 & 0xFFFF).astype(np.uint16)
    if pred_path.suffix == ".npy":
        arr = np.load(str(pred_path))
        arr = np.array(arr).reshape(-1)
        return arr.astype(np.uint16)
    if pred_path.suffix == ".npz":
        data = np.load(str(pred_path))
        key = list(data.keys())[0]
        arr = np.array(data[key]).reshape(-1)
        return arr.astype(np.uint16)
    raise ValueError(f"Extension non supportée: {pred_path}")

def load_semantics_for_scan(seq_root: Path, stem: str) -> np.ndarray:
    if SEMANTICS_SOURCE == "oracle":
        _, label_dir, _ = kitti_scan_paths(seq_root)
        gt32 = read_label_file(label_dir / f"{stem}.label")
        return unpack_semantic(gt32)
    else:
        pred_dir = Path(PRED_ROOT) / "sequences" / seq_root.name / "predictions"
        pred_file = _find_pred_file(pred_dir, stem)
        assert pred_file is not None, f"Prediction manquante: {pred_dir}/{stem}.*"
        return _load_semantics_generic(pred_file)

print("I/O utils chargés.")

In [None]:
# @title 4) Pré-traitements (features) + assignation d'instances
import numpy as np

def compute_features(points_xyzi: np.ndarray, mode: str = "none") -> np.ndarray:
    """
    points_xyzi: (N,4) [x,y,z,intensity]
    Retourne les features utilisées pour le clustering HGP.
    """
    x, y, z, i = points_xyzi[:,0], points_xyzi[:,1], points_xyzi[:,2], points_xyzi[:,3]
    if mode == "none":
        return points_xyzi[:, :3]               # x,y,z
    if mode == "bev_xy":
        return np.stack([x, y], axis=1)         # vue du dessus
    if mode == "bev_xyzi":
        return np.stack([x, y, i], axis=1)      # XY + intensité
    if mode == "polar":
        r = np.sqrt(x**2 + y**2)
        theta = np.arctan2(y, x)
        return np.stack([r, theta, z], axis=1)
    raise ValueError(f"Mode PREPROC inconnu: {mode}")

def assign_instances_from_clusters(cluster_ids: np.ndarray, class_ids: np.ndarray, thing_ids: list) -> np.ndarray:
    """
    - cluster_ids: labels de clustering (>0 = instance, -1 ou 0 = bruit)
    - class_ids:   labels sémantiques bruts (SemanticKITTI) pour les points clusterisés, 0 sinon
    - thing_ids:   liste des classes 'thing' (brutes)
    Construit des IDs d'instance >0 uniques par scan, offsettés par classe.
    """
    cluster_ids = np.asarray(cluster_ids).reshape(-1)
    class_ids   = np.asarray(class_ids).reshape(-1)
    inst = np.zeros_like(cluster_ids, dtype=np.int32)
    base = {cid: 1000*(i+1) for i, cid in enumerate(thing_ids)}
    for cid in np.unique(class_ids):
        if cid == 0:
            continue
        mask = class_ids == cid
        clusters = cluster_ids[mask]
        uniq = [u for u in np.unique(clusters) if u > 0]
        mapping = {u: j+1 for j,u in enumerate(sorted(uniq))}
        out = np.array([mapping.get(v, 0) for v in clusters], dtype=np.int32)
        inst[mask] = base[cid] + out
    return inst

print("Pré-traitements + assignation prêts.")

In [None]:
%%bash
#@title 0) Extraire 08.zip depuis Drive vers /tmp (exécution bash)
# Requiert que Drive soit monté avant:
# from google.colab import drive; drive.mount('/content/drive')
set -euo pipefail

ZIP="/content/drive/MyDrive/Datasets/semantic_kitti/sequences/08.zip"
DEST="/tmp/semkitti_local/sequences"

if [ ! -f "$ZIP" ]; then
  echo "Introuvable: $ZIP"
  echo "Vérifie le chemin (ex: /content/drive/MyDrive/sequences/08.zip) et que Drive est monté."
  exit 1
fi

mkdir -p "$DEST"
echo "Extraction de $ZIP -> $DEST"
unzip -q -o "$ZIP" -d "$DEST"
echo "Contenu extrait:"
find "$DEST/08" -maxdepth 2 -type d -print


In [None]:
#@title 0b) Utiliser la séquence locale extraite
from pathlib import Path

# On travaille sur /tmp/semkitti_local/sequences/08 extrait ci-dessus
DATA_ROOT = "/tmp/semkitti_local"

# Optionnel: vérification rapide
assert (Path(DATA_ROOT) / "sequences" / "08" / "velodyne").exists(), "velodyne introuvable sous /tmp/semkitti_local/sequences/08"
assert (Path(DATA_ROOT) / "sequences" / "08" / "labels").exists(), "labels introuvable sous /tmp/semkitti_local/sequences/08"

print("DATA_ROOT =", DATA_ROOT, "| SEQUENCES =", SEQUENCES)


In [None]:
!pip install /content/HGP-clusterer

import hgp_clusterer
from hgp_clusterer import HypergraphPercol


In [None]:
from concurrent.futures import ThreadPoolExecutor
import subprocess, shlex, os, shutil, zipfile, uuid

# ---------- utilitaires I/O ----------

def _rsync_dir(src: str, dst: str):
    Path(dst).mkdir(parents=True, exist_ok=True)
    try:
        cmd = f'rsync -a --info=progress2 "{src.rstrip("/")}/" "{dst.rstrip("/")}/"'
        subprocess.run(["bash", "-lc", cmd], check=True)
    except subprocess.CalledProcessError:
        for p in Path(src).glob("*"):
            q = Path(dst) / p.name
            if p.is_dir():
                shutil.copytree(p, q, dirs_exist_ok=True)
            else:
                shutil.copy2(p, q)

def ensure_local_sequence(seq_id: str, src_root: Path) -> Path:
    """
    Si DATA_ROOT est distant (Drive/gcsfuse/NFS/gs://), on synchronise une fois vers /tmp
    pour des lectures rapides.
    """
    src_str = str(src_root)
    local_root = Path("/tmp/semkitti_local") / "sequences" / seq_id
    if (local_root / "labels").exists() and (local_root / "velodyne").exists():
        return local_root

    is_remote = src_str.startswith("gs://") or "/content/drive" in src_str or "/mnt/" in src_str
    if not is_remote:
        return src_root

    print(f"[{seq_id}] Staging local -> {local_root}")
    (local_root / "labels").mkdir(parents=True, exist_ok=True)
    (local_root / "velodyne").mkdir(parents=True, exist_ok=True)

    if src_str.startswith("gs://"):
        for sub in ["labels", "velodyne"]:
            cmd = f'gsutil -m rsync -r "{src_str}/{sub}" "{(local_root / sub).as_posix()}"'
            subprocess.run(["bash", "-lc", cmd], check=True)
    else:
        _rsync_dir(f"{src_str}/labels", (local_root / "labels").as_posix())
        _rsync_dir(f"{src_str}/velodyne", (local_root / "velodyne").as_posix())

    return local_root

# ---------- sémantique / instance helpers ----------

def fast_load_semantics_for_scan(seq_root: Path, stem: str) -> np.ndarray:
    # Lecture ultra simple: panoptic uint32 -> sem = val & 0xFFFF
    arr = np.fromfile((seq_root / "labels" / f"{stem}.label").as_posix(), dtype=np.uint32)
    return (arr & 0xFFFF).astype(np.int32)

In [None]:
from tqdm.auto import tqdm
from hgp_clusterer import HypergraphPercol

import numpy as np
from scipy.spatial import ConvexHull


# ------------------------ MAPPINGS & CONSTANTES ------------------------------
BBOX_WEB = {1: [3., 1.8, 1.5], # car: https://www.motor1.com/news/707996/vehicles-larger-than-ever-usa-europe
            2: [1.75, 0.61, 1.3], # bicycle: https://thebestbikelock.com/wp-content/uploads/2020/01/one-bike-average-size.gif
            3: [2., 0.95, 1.3], # motorcycle: https://carparkjourney.wordpress.com/2013/07/16/what-is-the-average-size-of-a-motorbike/
            4: [8, 3, 1.5], # truck
            5: [8, 3, 1.5], # other-vehicle
            6: [0.94, 0.94, 1.5], # person: RLSP arm span height: https://pubmed.ncbi.nlm.nih.gov/25063245/  average height in germany https://en.wikipedia.org/wiki/Average_human_height_by_country 179. We get 179*1.06/2
            7: [1.75, 0.61, 1.5], # bicyclist: bicycle
            8: [2.1, 0.95, 1.5], # motorcyclist: motorcycle
            }

LEARNING_MAP_INVERSE = {0:0,1:10,2:11,3:15,4:18,5:20,6:30,7:31,8:32,9:40,10:44,11:48,12:49,13:50,14:51,15:70,16:71,17:72,18:80,19:81}
mapper = {0:0,1:0,10:1,11:2,13:5,15:3,16:5,18:4,20:5,30:6,31:7,32:8,40:9,44:10,48:11,49:12,50:13,51:14,52:0,60:9,70:15,71:16,72:17,80:18,81:19,99:0,252:1,253:7,254:6,255:8,256:5,257:5,258:4,259:5}

# # --------------------------- OUTILS GEOMETRIQUES -----------------------------

def convex_hull_measure(points: np.ndarray) -> float:
    """Compute the area (2D) or volume (3D) of the convex hull.

    Parameters
    ----------
    points : array-like, shape (n_samples, 2) or (n_samples, 3)
        Nuage de points en 2D ou 3D. Chaque ligne correspond à un point.

    Returns
    -------
    float
        Aire (2D) ou volume (3D) de l'enveloppe convexe.

    Raises
    ------
    ValueError
        Si la dimension n'est pas 2 ou 3, ou s'il n'y a pas assez de points.
    """

    pts = np.asarray(points, dtype=float)

    if pts.ndim != 2:
        raise ValueError(
            f"`points` doit être un tableau 2D (n_samples, dim), pas {pts.ndim}D."
        )

    n_samples, dim = pts.shape

    if dim not in (2, 3):
        raise ValueError(
            f"La dimension doit être 2 ou 3 (reçu dim={dim})."
        )

    if n_samples <= dim:
        return 0
        # raise ValueError(
        #     f"Il faut au moins dim+1 points (dim={dim} => min={dim+1}), "
        #     f"reçu n={n_samples}."
        # )

    # `volume` est :
    #   - l'aire pour un problème 2D
    #   - le volume pour un problème 3D
    # QJ = 'joggle' pour gérer les cas quasi-dégénérés.
    hull = ConvexHull(pts, qhull_options="QJ")
    return float(hull.volume)

def loss_volume(points, volume_attendu) :
    volume_calcule = convex_hull_measure(points)
    return np.abs(volume_calcule - volume_attendu)


def run_hgp_on_sequence(seq_id: str) -> bool:
    seq_root = Path(DATA_ROOT) / "sequences" / seq_id
    out_pred_dir = Path(OUT_ROOT) / "sequences" / seq_id / "predictions"
    out_pred_dir.mkdir(parents=True, exist_ok=True)

    vel_dir, label_dir, stems = kitti_scan_paths(seq_root)

    # # Charge sémantique brute (pour écriture panoptique finale)
    # sem_by_stem = {}
    # for stem in tqdm(stems, desc=f"[{seq_id}] Prépare sémantique"):
    #     sem_by_stem[stem] = load_semantics_for_scan(seq_root, stem).astype(np.int32)
    # 1) Charger sémantiques + créer buffers d'instances (parallèle I/O)
    sem_by_stem = {}
    inst_by_stem = {}

    def _load_one(stem):
        sem = fast_load_semantics_for_scan(seq_root, stem)
        return stem, sem

    max_workers = os.cpu_count() # int(os.environ.get("SEM_LOAD_WORKERS", "16"))
    print(f"[{seq_id}] Chargement sémantique (max_workers={max_workers})")
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        for stem, sem in tqdm(ex.map(_load_one, stems),
                              total=len(stems),
                              desc=f"[{seq_id}] Chargement sémantique",
                              ncols=0, leave=False):
            sem_by_stem[stem] = sem
            inst_by_stem[stem] = np.zeros_like(sem, dtype=np.int32)

    # Pour chaque scan, on clusterise indépendamment par classe 'thing'
    for stem in tqdm(stems, desc=f"[{seq_id}] Clustering HGP"):
        pts = read_points_bin(vel_dir / f"{stem}.bin")  # (N,4)
        feats = compute_features(pts, PREPROC)          # (N,d)
        sem_raw = sem_by_stem[stem]

        N = feats.shape[0]
        cluster_ids = np.zeros(N, dtype=np.int32)
        class_ids   = np.zeros(N, dtype=np.int32)

        for cid in THING_RAW_IDS:
            mask = (sem_raw == cid)
            n = int(mask.sum())
            if n < min_cluster_size:
                continue

            X = feats[mask]

            # Create a local variable for the method parameter to avoid UnboundLocalError
            hgp_method_param = method

            if splitting :
                cid_map = mapper[cid]
                bbox = BBOX_WEB[cid_map]
                d = X.shape[1]
                if d == 2 :
                    volume_attendu = bbox[0]*bbox[1]
                elif d == 3 :
                    volume_attendu = bbox[0]*bbox[1]*bbox[2]
                else :
                    raise ValueError(f"Dimension non supportée: {d}")
                loss = lambda points : loss_volume(points, volume_attendu)
            else :
                loss = None
            if hgp_method_param == "DBSCAN" :
                hgp_method_param = bbox[0]/2
            # print("Appel à HypergraphPercol")
            labels = HypergraphPercol(
                M=X,
                K=K,
                min_cluster_size=min_cluster_size,
                # min_samples=min_samples,
                method=hgp_method_param, # Use the local parameter here
                splitting=loss,
                # weight_face=weight_face,
                label_all_points=label_all_points,
                # return_multi_clusters=return_multi_clusters,
                # complex_chosen=complex_chosen,
                expZ=expZ,
                cgal_root=os.environ.get("CGALDELAUNAY_ROOT", None),
                verbeux=False,
            )+1
            labels = np.asarray(labels).reshape(-1).astype(np.int32)
            labels[labels < 0] = 0

            cluster_ids[mask] = labels
            class_ids[mask]   = cid

        inst = assign_instances_from_clusters(cluster_ids, class_ids, THING_RAW_IDS)

        panoptic = pack_panoptic(sem_raw.astype(np.uint16), inst.astype(np.uint16))
        (out_pred_dir / f"{stem}.label").write_bytes(panoptic.astype(np.uint32).tobytes())

    return True

for s in tqdm(SEQUENCES, desc="Séquences"):
    ok = run_hgp_on_sequence(s)
    if not ok:
        raise RuntimeError(f"Échec sur séquence {s}")

print("Terminé. Prédictions écrites sous:", OUT_ROOT)


In [None]:
#@title 7) Visualisation 3D (Plotly): oracle vs HGP
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pathlib import Path

# Groupes SemanticKITTI
THINGS_IDS = {10, 11, 15, 18, 20, 30, 31, 32}
STUFF_IDS  = {40, 44, 48, 49, 50, 51, 70, 71, 72, 80, 81}

# === Classes things SemanticKITTI ===
# D'après l'extension panoptique: car, truck, other-vehicle, motorcycle, bicycle, person, bicyclist, motorcyclist
THING_CLASS_IDS = [10, 18, 20, 15, 11, 30, 31, 32]

CLASS_ID_TO_NAME = {
    10: "car",
    11: "bicycle",
    15: "motorcycle",
    18: "truck",
    20: "other-vehicle",
    30: "person",
    31: "bicyclist",
    32: "motorcyclist",
}

# Ids "stuff" courants (utiles pour la visualisation; non exhaustif)
STUFF_CLASS_IDS = [40, 44, 48, 49, 50, 51, 52, 60, 70, 71, 72, 80]  # road, parking, sidewalk, other-ground, building, fence, vegetation, trunk, terrain, pole, traffic-sign, etc.
CLASS_COLOR = {
    10: '#F59664', 11: '#FFDE59', 15: '#E04F5F', 18: '#7F7F7F',
    20: '#CFCFCF', 30: '#FF7F7F', 31: '#FFBF80', 32: '#FF8080',
    40: '#808080', 44:'#B0B0B0', 48:'#C0C0C0', 49:'#A0A0A0',
    50:'#8F8FBD', 51:'#7F7F7F', 52:'#5FA15F', 60:'#7B5E57',
    70:'#9C9C5C', 71:'#FFD700', 72:'#A0A0FF', 80:'#B0E0E6'
}
# Noms de classes demandés (THINGS)
CLASS_ID_TO_NAME = {
    10: "car",
    11: "bicycle",
    15: "motorcycle",
    18: "truck",
    20: "other-vehicle",
    30: "person",
    31: "bicyclist",
    32: "motorcyclist",
}

# Palette chaude pour THINGS, sombre pour STUFF
WARM = [
    "#d73027", "#e6550d", "#e34a33", "#f46d43", "#fdae61",
    "#f16913", "#fb6a4a", "#dd1c77", "#e31a1c", "#ff9f1c",
]
DARK_STUFF = {
    40: "#222222",  # road
    44: "#2a2a2a",  # parking
    48: "#333333",  # sidewalk
    49: "#1f1f1f",  # other-ground
    50: "#2f3b4b",  # building
    51: "#2c2c3c",  # fence
    70: "#284b2f",  # vegetation
    71: "#3d2b1f",  # trunk
    72: "#2e3d1f",  # terrain
    80: "#3a3a3a",  # pole
    81: "#4c3a1e",  # traffic-sign
}
STUFF_OPACITY = 0.28
THING_OPACITY = 0.90
def unpack_semantic(label32: np.ndarray) -> np.ndarray:
    return (label32 & 0xFFFF).astype(np.uint16)

def unpack_instance(label32: np.ndarray) -> np.ndarray:
    return (label32 >> 16).astype(np.uint16)

def load_panoptic_pred(seq_id: str, stem: str) -> np.ndarray:
    pred_path = Path(OUT_ROOT) / "sequences" / seq_id / "predictions" / f"{stem}.label"
    if pred_path.is_file():
        return read_label_file(pred_path)
    raise FileNotFoundError(str(pred_path))


def make_colors_for_instances(inst_ids: np.ndarray) -> np.ndarray:
    """Une couleur par instance: 0 -> gris, >0 -> *palette chaude* stable par ID.
    Conserve le nom de fonction de ton notebook.
    """
    uniq = np.unique(inst_ids)
    lut = {0: '#808080'}
    for u in uniq:
        iu = int(u)
        if iu == 0:
            continue
        # Couleur chaude stable par ID (index choisi via RNG déterministe)
        rng2 = np.random.default_rng(iu + 12345)
        col = WARM[int(rng2.integers(0, len(WARM)))]
        lut[iu] = col
    return np.array([lut[int(v)] for v in inst_ids])


def _warm_color_by_class() -> dict:
    # Associe une couleur chaude à chaque classe THINGS
    sids = sorted(list(THINGS_IDS))
    return {sid: WARM[i % len(WARM)] for i, sid in enumerate(sids)}


def _dark_color_for_class(sid: int) -> str:
    # Fallback sur CLASS_COLOR si inconnu, sinon gris foncé
    return DARK_STUFF.get(int(sid), CLASS_COLOR.get(int(sid), '#2b2b2b'))


def _class_name(sid: int) -> str:
    # Nom lisible pour THINGS, sinon fallback explicite
    return CLASS_ID_TO_NAME.get(int(sid), f"stuff({int(sid)})")


def _scatter3d(x, y, z, color, name, opacity, customdata=None, hovertemplate=None):
    return go.Scatter3d(
        x=x, y=y, z=z,
        mode="markers",
        marker=dict(size=2, color=color, opacity=opacity),
        name=name,
        showlegend=False,
        customdata=customdata,
        hovertemplate=hovertemplate,
    )


def visualize_triplet(seq_id="08", index=0, decimate=10):
    seq_root = Path(DATA_ROOT) / "sequences" / seq_id
    vel_dir, label_dir, stems = kitti_scan_paths(seq_root)
    stem = stems[index]
    pts = read_points_bin(vel_dir / f"{stem}.bin")

    # Oracle (vérité terrain)
    gt32 = read_label_file(label_dir / f"{stem}.label")
    # SemanticKITTI: uint32 pack [inst<<16 | sem]
    gt_sem = unpack_semantic(gt32)
    # On force l'ID d'instance à partir du 32 bits pour éviter toute ambiguïté
    gt_inst = (gt32.astype(np.uint32) >> 16).astype(np.int32)

    # Panoptique HGP
    pan32 = load_panoptic_pred(seq_id, stem)
    pan_sem = unpack_semantic(pan32)
    pan_inst = unpack_instance(pan32)

    # Décimation pour l'affichage
    N = len(pts)
    step = max(1, int(decimate))
    idx = np.arange(0, N, step)

    # Données décimées
    pts_d = pts[idx]
    gt_sem_d = gt_sem[idx]
    gt_inst_d = gt_inst[idx]
    pan_sem_d = pan_sem[idx]
    pan_inst_d = pan_inst[idx]

    # Prépare mapping couleurs
    warm_by_class = _warm_color_by_class()

    # Figure 2 colonnes (Oracle à gauche, HGP à droite)
    fig = make_subplots(rows=1, cols=2, specs=[[{"type":"scene"}, {"type":"scene"}]],
                        subplot_titles=("Oracle", "HGP Panoptic"))

    # -------------------
    # 1) ORACLE: stuff sombre + things chaudes + hover avec classe et ID oracle
    m_stuff = np.isin(gt_sem_d, list(STUFF_IDS))
    if np.any(m_stuff):
        cols = np.array([_dark_color_for_class(s) for s in gt_sem_d[m_stuff]])
        cd = np.stack([
            np.vectorize(_class_name)(gt_sem_d[m_stuff]),
            gt_sem_d[m_stuff].astype(int),
            gt_inst_d[m_stuff].astype(int)
        ], axis=1)
        ht = "Oracle · %{customdata[0]}<br>class_id=%{customdata[1]}<br>inst_id=%{customdata[2]}<extra></extra>"
        fig.add_trace(
            _scatter3d(pts_d[m_stuff,0], pts_d[m_stuff,1], pts_d[m_stuff,2], cols,
                       "Oracle-stuff", STUFF_OPACITY, customdata=cd, hovertemplate=ht),
            row=1, col=1,
        )

    m_things = np.isin(gt_sem_d, list(THINGS_IDS))
    if np.any(m_things):
        cols = np.array([warm_by_class.get(int(s), '#ff9f1c') for s in gt_sem_d[m_things]])
        cd = np.stack([
            np.vectorize(_class_name)(gt_sem_d[m_things]),
            gt_sem_d[m_things].astype(int),
            gt_inst_d[m_things].astype(int)
        ], axis=1)
        ht = "Oracle · %{customdata[0]}<br>class_id=%{customdata[1]}<br>inst_id=%{customdata[2]}<extra></extra>"
        fig.add_trace(
            _scatter3d(pts_d[m_things,0], pts_d[m_things,1], pts_d[m_things,2], cols,
                       "Oracle-things", THING_OPACITY, customdata=cd, hovertemplate=ht),
            row=1, col=1,
        )

    # -------------------
    # 2) HGP PANOPTIC: fond stuff sombre + instances THINGS chaudes + hover avec classe (+ id pour HGP)
    # Fond stuff
    m_stuff = np.isin(pan_sem_d, list(STUFF_IDS))
    if np.any(m_stuff):
        cols = np.array([_dark_color_for_class(s) for s in pan_sem_d[m_stuff]])
        cd = np.stack([
            np.vectorize(_class_name)(pan_sem_d[m_stuff]),
            pan_sem_d[m_stuff].astype(int),
            pan_inst_d[m_stuff].astype(int)
        ], axis=1)
        ht = "HGP · %{customdata[0]}<br>class_id=%{customdata[1]}<br>inst_id=%{customdata[2]}<extra></extra>"
        fig.add_trace(
            _scatter3d(pts_d[m_stuff,0], pts_d[m_stuff,1], pts_d[m_stuff,2], cols,
                       "HGP-stuff", STUFF_OPACITY, customdata=cd, hovertemplate=ht),
            row=1, col=2,
        )

    # Instances THINGS par-dessus (une trace par instance >0)
    m_things = np.isin(pan_sem_d, list(THINGS_IDS))
    inst_ids = np.unique(pan_inst_d[m_things])
    inst_colors_all = make_colors_for_instances(pan_inst_d)
    for iid in inst_ids:
        if int(iid) <= 0:
            continue
        m_i = (pan_inst_d == iid) & m_things
        if not np.any(m_i):
            continue
        col_i = inst_colors_all[m_i][0]
        cd = np.stack([
            np.vectorize(_class_name)(pan_sem_d[m_i]),
            pan_sem_d[m_i].astype(int),
            np.full(m_i.sum(), int(iid), dtype=int)
        ], axis=1)
        ht = "HGP · %{customdata[0]}<br>class_id=%{customdata[1]}<br>inst_id=%{customdata[2]}<extra></extra>"
        fig.add_trace(
            _scatter3d(pts_d[m_i,0], pts_d[m_i,1], pts_d[m_i,2], np.full(m_i.sum(), col_i),
                       f"inst {int(iid)}", THING_OPACITY, customdata=cd, hovertemplate=ht),
            row=1, col=2,
        )

    # Axes/ratio propres: X,Y au sol, Z vers le haut, repère orthonormé
    for c in [1, 2]:
        fig.update_scenes(
            dict(
                xaxis=dict(visible=False, title='x'),
                yaxis=dict(visible=False, title='y'),
                zaxis=dict(visible=False, title='z'),
                aspectmode='data',
                camera=dict(up=dict(x=0, y=0, z=1)),
            ),
            row=1, col=c,
        )

    fig.update_layout(height=600, width=1100, title_text=f"Sequence {seq_id} · Scan {stem} (1/{len(stems)}, decimate={step})")
    return fig


print("Visualisation prête. Appelez: visualize_triplet(seq_id, index, decimate)")
# Exemple d'usage:
fig = visualize_triplet("08", index=201, decimate=1)
fig.show()
