In [2]:
import os, time, csv
from typing import List, Dict

import cv2
import numpy as np
import torch
from ultralytics import YOLO
from transformers import AutoImageProcessor, AutoModelForDepthEstimation

# ---------------- CONFIG ----------------
IMG_PATH = os.environ.get(
    "CITYSCAPES_IMG",
    "./leftImg8bit_trainvaltest/leftImg8bit/val/frankfurt/frankfurt_000000_003025_leftImg8bit.png"
)
OUT_DIR = os.environ.get("OUT_DIR", "./outputs")
os.makedirs(OUT_DIR, exist_ok=True)

OUT_IMAGE = os.path.join(OUT_DIR, "annotated.png")
OUT_CSV   = os.path.join(OUT_DIR, "detections_with_distance.csv")

YOLO_WEIGHTS = os.environ.get("YOLO_WEIGHTS", "yolov8n.pt")

# Object classes we care about
OBSTACLE_CLASS_NAMES = {
    "person", "bicycle", "car", "motorcycle", "bus", "truck", "train",
    "traffic light", "stop sign", "bench"
}

# ---------------- HELPERS ----------------
def load_image(path: str) -> np.ndarray:
    if not os.path.isfile(path):
        raise FileNotFoundError(f"Image not found: {path}")
    img_bgr = cv2.imread(path, cv2.IMREAD_COLOR)
    if img_bgr is None:
        raise RuntimeError(f"Failed to read image: {path}")
    return img_bgr

def run_yolo(img_bgr: np.ndarray, weights: str = YOLO_WEIGHTS, conf: float = 0.25):
    """Run YOLOv8 inference on a BGR image."""
    model = YOLO(weights)
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    results = model.predict(source=img_rgb, conf=conf, verbose=False)[0]

    detections = []
    names = model.model.names
    if results.boxes is not None and len(results.boxes) > 0:
        for b in results.boxes:
            cls_id = int(b.cls.item())
            cls_name = names.get(cls_id, str(cls_id))
            if cls_name not in OBSTACLE_CLASS_NAMES:
                continue
            xyxy = b.xyxy.squeeze().cpu().numpy()
            conf_score = float(b.conf.item())
            detections.append({
                "cls_name": cls_name,
                "conf": conf_score,
                "xyxy": xyxy
            })
    return detections

# ---------------- TRAIN YOLO ----------------
def train_traffic_light(data_yaml: str, epochs: int = 50, batch: int = 16):
    """
    Train YOLOv8 on a custom traffic light dataset.
    data_yaml: path to dataset YAML (with train/val images and classes)
    """
    print("[INFO] Training YOLOv8 for traffic lights...")
    model = YOLO("yolov8n.pt")  # start from pretrained
    model.train(data=data_yaml, epochs=epochs, batch=batch, imgsz=640, name="traffic_light")
    trained_weights = f"runs/detect/traffic_light/weights/best.pt"
    print(f"[INFO] Training completed. Weights saved at: {trained_weights}")
    return trained_weights

# -------- Depth Anything v2 loader --------
def load_depth_anything(model_id: str = "depth-anything/Depth-Anything-V2-small-hf"):
    """
    Load Depth Anything v2 model + processor.
    Returns: (processor, model, device)
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    processor = AutoImageProcessor.from_pretrained(model_id)
    model = AutoModelForDepthEstimation.from_pretrained(model_id).to(device)
    model.eval()
    return processor, model, device

def run_depth_anything(img_bgr: np.ndarray, processor, model, device: str) -> np.ndarray:
    """
    Compute relative depth map with Depth Anything v2.
    Returns a float32 HxW array normalized to 0..1.
    """
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)

    inputs = processor(images=img_rgb, return_tensors="pt").to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        predicted_depth = outputs.predicted_depth.squeeze().cpu().numpy().astype(np.float32)

    # Normalize depth 0..1
    d_min, d_max = float(predicted_depth.min()), float(predicted_depth.max())
    if d_max > d_min:
        depth_norm = (predicted_depth - d_min) / (d_max - d_min + 1e-8)
    else:
        depth_norm = np.zeros_like(predicted_depth, dtype=np.float32)

    return depth_norm

# -------- Fusion helpers --------
def median_depth_in_box(depth_map: np.ndarray, xyxy: np.ndarray) -> float:
    x1, y1, x2, y2 = xyxy.astype(int)
    h, w = depth_map.shape[:2]
    x1 = np.clip(x1, 0, w-1)
    x2 = np.clip(x2, 0, w-1)
    y1 = np.clip(y1, 0, h-1)
    y2 = np.clip(y2, 0, h-1)
    if x2 <= x1 or y2 <= y1:
        return float("nan")
    roi = depth_map[y1:y2, x1:x2]
    if roi.size == 0:
        return float("nan")
    return float(np.median(roi))

def direction_from_box(xyxy: np.ndarray, img_w: int) -> str:
    x1, y1, x2, y2 = xyxy
    cx = 0.5 * (x1 + x2)
    left_thr = img_w / 3.0
    right_thr = 2.0 * img_w / 3.0
    if cx < left_thr:
        return "Left"
    elif cx > right_thr:
        return "Right"
    else:
        return "Center"

def annotate_and_save(img_bgr: np.ndarray, detections: List[Dict], depth_map: np.ndarray,
                      out_image_path: str, out_csv_path: str):
    os.makedirs(os.path.dirname(out_image_path), exist_ok=True)

    h, w = img_bgr.shape[:2]
    vis = img_bgr.copy()

    # CSV writer
    with open(out_csv_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["class", "confidence", "distance_rel", "direction", "x1", "y1", "x2", "y2"])

        # Sort detections by distance (closest first)
        detections_sorted = sorted(detections, key=lambda det: median_depth_in_box(depth_map, det["xyxy"]))

        for det in detections_sorted:
            cls_name = det["cls_name"]
            conf = det["conf"]
            xyxy = det["xyxy"]
            dist_rel = median_depth_in_box(depth_map, xyxy)  # 0..1
            direction = direction_from_box(xyxy, w)

            # Draw
            x1, y1, x2, y2 = xyxy.astype(int)
            color = (0, 255, 0)
            cv2.rectangle(vis, (x1, y1), (x2, y2), color, 2)
            label = f"{cls_name} {conf:.2f} | rel={dist_rel:.2f} | {direction}"
            (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(vis, (x1, max(0, y1 - th - 6)), (x1 + tw + 4, y1), color, -1)
            cv2.putText(vis, label, (x1 + 2, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 
                        0.5, (0, 0, 0), 1, cv2.LINE_AA)

            writer.writerow([cls_name, f"{conf:.4f}", f"{dist_rel:.4f}",
                             direction, x1, y1, x2, y2])

    cv2.imwrite(out_image_path, vis)

# ---------------- MAIN ----------------
def main():
    t0 = time.time()
    print("[INFO] Loading image:", IMG_PATH)
    img_bgr = load_image(IMG_PATH)

    print("[INFO] Running YOLOv8...")
    detections = run_yolo(img_bgr, weights=YOLO_WEIGHTS, conf=0.25)
    print(f"[INFO] {len(detections)} obstacle detections kept")

    print("[INFO] Loading Depth Anything v2...")
    processor, depth_model, device = load_depth_anything()

    print("[INFO] Predicting depth...")
    depth_rel = run_depth_anything(img_bgr, processor, depth_model, device)

    print("[INFO] Fusing detection + depth and saving outputs...")
    annotate_and_save(img_bgr, detections, depth_rel, OUT_IMAGE, OUT_CSV)

    dt = time.time() - t0
    print(f"[DONE] Saved: {OUT_IMAGE}")
    print(f"[DONE] Saved: {OUT_CSV}")
    print(f"[TIME] {dt:.2f} s")

if __name__ == "__main__":
    main()


[INFO] Loading image: ./leftImg8bit_trainvaltest/leftImg8bit/val/frankfurt/frankfurt_000000_003025_leftImg8bit.png
[INFO] Running YOLOv8...
[INFO] 10 obstacle detections kept
[INFO] Loading Depth Anything v2...


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


[INFO] Predicting depth...
[INFO] Fusing detection + depth and saving outputs...
[DONE] Saved: ./outputs\annotated.png
[DONE] Saved: ./outputs\detections_with_distance.csv
[TIME] 10.33 s
