In [1]:
# 1. Imports and basic paths

from pathlib import Path
import json
from collections import Counter

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from ultralytics import YOLO
from PIL import Image

# Paths for your repo
DATA_DIR = Path("data")
ANN_DIR = DATA_DIR / "annotations"
IMG_DIR = DATA_DIR / "images"

VAL_JSON = ANN_DIR / "instances_val_gps_split_with_signs.json"
YOLO_MODEL_PATH = Path("bestv12.pt")

print("Val json exists:", VAL_JSON.exists())
print("YOLO model exists:", YOLO_MODEL_PATH.exists())

Val json exists: True
YOLO model exists: True


In [2]:
# 2. Load COCO val and build image + category dicts

def load_coco(path: Path):
    with open(path, "r") as f:
        return json.load(f)

coco_val = load_coco(VAL_JSON)

print("Val images:", len(coco_val["images"]))
print("Val annotations:", len(coco_val["annotations"]))
print("Num categories:", len(coco_val["categories"]))

# image_id -> image_info
images_by_id = {img["id"]: img for img in coco_val["images"]}

# category_id -> name
cat_id_to_name = {c["id"]: c["name"] for c in coco_val["categories"]}
list(cat_id_to_name.items())

Val images: 2098
Val annotations: 21261
Num categories: 49


[(1, 'Police Officer'),
 (2, 'Police Vehicle'),
 (3, 'Cone'),
 (4, 'Fence'),
 (5, 'Drum'),
 (6, 'Barricade'),
 (7, 'Barrier'),
 (8, 'Work Vehicle'),
 (9, 'Vertical Panel'),
 (10, 'Tubular Marker'),
 (11, 'Arrow Board'),
 (12, 'Bike Lane'),
 (13, 'Work Equipment'),
 (14, 'Worker'),
 (15, 'Other Roadwork Objects'),
 (16, 'Temporary Traffic Control Message Board'),
 (17, 'Temporary Traffic Control Sign'),
 (19, 'Temporary Traffic Control Sign: left arrow'),
 (20, 'Temporary Traffic Control Sign: right arrow'),
 (21, 'Temporary Traffic Control Sign: up arrow'),
 (22, 'Temporary Traffic Control Sign: left chevron'),
 (23, 'Temporary Traffic Control Sign: right lane ends sign'),
 (24, 'Temporary Traffic Control Sign: two lane shift arrows'),
 (25, 'Temporary Traffic Control Sign: right chevron'),
 (26, 'Temporary Traffic Control Sign: lane shift arrow'),
 (27, 'Temporary Traffic Control Sign: up diagonal right arrow'),
 (28, 'Temporary Traffic Control Sign: left lane ends sign'),
 (29, 'Temp

In [3]:
# 3. Map scene_level_tags.travel_alteration to a simple scene label

def get_scene_label(img_dict):
    """
    Use scene_level_tags.travel_alteration to get a coarse scene label.
    lane_shift or workzone_ahead or None.
    """
    tags = img_dict.get("scene_level_tags", {})
    travel_alteration = tags.get("travel_alteration", [])

    # sometimes list, sometimes string
    if isinstance(travel_alteration, list):
        travel_alteration = travel_alteration[0] if travel_alteration else None

    if travel_alteration is None:
        return None

    txt = str(travel_alteration).lower()

    if "lane shift" in txt:
        return "lane_shift"

    # anything else non empty is considered generic workzone ahead
    return "workzone_ahead"

# quick check of distribution
scene_labels = [get_scene_label(img) for img in coco_val["images"]]
from collections import Counter
print("Scene label counts:", Counter([s for s in scene_labels if s is not None]))

Scene label counts: Counter({'workzone_ahead': 2041, 'lane_shift': 56})


In [4]:
# 4. Define semantic groups for categories

CHANNELIZATION = {
    "Cone",
    "Drum",
    "Barricade",
    "Barrier",
    "Vertical Panel",
    "Tubular Marker",
    "Fence",
}

WORKERS = {
    "Worker",
    "Police Officer",
}

VEHICLES = {
    "Work Vehicle",
    "Police Vehicle",
}

MESSAGE_BOARD = {
    "Temporary Traffic Control Message Board",
    "Arrow Board",
}

TTC_SIGNS = {
    "Temporary Traffic Control Sign",
    "Temporary Traffic Control Sign: left arrow",
    "Temporary Traffic Control Sign: right arrow",
    "Temporary Traffic Control Sign: up arrow",
    "Temporary Traffic Control Sign: left chevron",
    "Temporary Traffic Control Sign: right lane ends sign",
    "Temporary Traffic Control Sign: two lane shift arrows",
    "Temporary Traffic Control Sign: right chevron",
    "Temporary Traffic Control Sign: lane shift arrow",
    "Temporary Traffic Control Sign: up diagonal right arrow",
    "Temporary Traffic Control Sign: left lane ends sign",
    "Temporary Traffic Control Sign: bent left arrow",
    "Temporary Traffic Control Sign: flagger",
    "Temporary Traffic Control Sign: bent right arrow",
    "Temporary Traffic Control Sign: no left turn",
    "Temporary Traffic Control Sign: pedestrian: right arrow",
    "Temporary Traffic Control Sign: pedestrian: left arrow",
    "Temporary Traffic Control Sign: up diagonal left arrow",
    "Temporary Traffic Control Sign: pedestrian",
    "Temporary Traffic Control Sign: no right turn",
    "Temporary Traffic Control Sign: bi-directional arrow",
    "Temporary Traffic Control Sign: two upward diagonal arrows",
    "Temporary Traffic Control Sign: curved right arrow",
    "Temporary Traffic Control Sign: down diagonal left arrow",
    "Temporary Traffic Control Sign: do not enter sign",
    "Temporary Traffic Control Sign: worker",
    "Temporary Traffic Control Sign: bicycle",
    "Temporary Traffic Control Sign: two downward diagonal arrows",
    "Temporary Traffic Control Sign: curved left arrow",
    "Temporary Traffic Control Sign: curved left arrow, curved right arrow",
    "Temporary Traffic Control Sign: work vehicle",
    "Temporary Traffic Control Sign: traffic signal",
    "Temporary Traffic Control Sign: up arrow. stop sign",
}

OTHER_ROADWORK = {
    "Work Equipment",
    "Other Roadwork Objects",
}

name_to_group = {}
for n in CHANNELIZATION:
    name_to_group[n] = "channelization"
for n in WORKERS:
    name_to_group[n] = "workers"
for n in VEHICLES:
    name_to_group[n] = "vehicles"
for n in MESSAGE_BOARD:
    name_to_group[n] = "message_board"
for n in TTC_SIGNS:
    name_to_group[n] = "ttc_signs"
for n in OTHER_ROADWORK:
    name_to_group[n] = "other_roadwork"

unmapped = [n for n in cat_id_to_name.values() if n not in name_to_group]
print("Unmapped category names:", unmapped)

Unmapped category names: ['Bike Lane']


In [5]:
# 5. Load YOLOv12 model

yolo_model = YOLO(str(YOLO_MODEL_PATH))
yolo_model.to("cuda")  # or "cpu" if needed
yolo_model.eval()

YOLO(
  (model): DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C2f(
        (cv1): Conv(
          (conv): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(96, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_s

In [6]:
# 6. Helpers to bin objects in left/mid/right and near/mid/far

def compute_position_bins(x_center_norm):
    """
    x_center_norm in [0, 1].
    Returns 'left', 'mid', or 'right'.
    """
    if x_center_norm < 1 / 3:
        return "left"
    elif x_center_norm < 2 / 3:
        return "mid"
    else:
        return "right"


def area_bin(area_norm):
    """
    area_norm is box_area / image_area.
    Rough depth bin: far, mid, near.
    """
    if area_norm < 0.02:
        return "far"
    elif area_norm < 0.08:
        return "mid"
    else:
        return "near"

In [7]:
# 7. Run YOLO on each val image and build feature rows

features = []

for img_info in tqdm(coco_val["images"], desc="Processing val images"):
    img_id = img_info["id"]
    file_name = img_info["file_name"]
    img_path = IMG_DIR / file_name

    if not img_path.exists():
        # if something is missing on disk, skip
        continue

    # run YOLO on this image
    results = yolo_model.predict(
        source=str(img_path),
        imgsz=960,     # can change later if you want
        conf=0.25,
        iou=0.7,
        verbose=False,
        device=0,
    )
    r = results[0]

    w = img_info["width"]
    h = img_info["height"]
    img_area = float(w * h)

    if r.boxes is None or len(r.boxes) == 0:
        # no detections, just zero features
        row = {
            "image_id": img_id,
            "file_name": file_name,
            "total_objs": 0,
        }
        for g in ["channelization", "workers", "vehicles", "ttc_signs", "message_board", "other_roadwork"]:
            row[f"count_{g}"] = 0
            row[f"frac_{g}"] = 0.0
        for side in ["left", "mid", "right"]:
            row[f"workers_{side}"] = 0
            row[f"channelization_{side}"] = 0
        for dist in ["near", "mid", "far"]:
            row[f"workers_{dist}"] = 0
            row[f"channelization_{dist}"] = 0
        row["scene_label"] = get_scene_label(img_info)
        features.append(row)
        continue

    # there are detections
    total_objs = 0
    group_counts = Counter()
    workers_side = Counter()
    channel_side = Counter()
    workers_dist = Counter()
    channel_dist = Counter()

    for box in r.boxes:
        cls_id = int(box.cls.item())
        # YOLO classes are 0 based, your COCO ids start at 1 and are in the same order
        cat_name = cat_id_to_name.get(cls_id + 1, None)
        if cat_name is None:
            continue

        group = name_to_group.get(cat_name, None)
        if group is None:
            group = "other_roadwork"

        total_objs += 1
        group_counts[group] += 1

        x1, y1, x2, y2 = box.xyxy[0].tolist()
        cx = (x1 + x2) / 2.0
        cy = (y1 + y2) / 2.0
        area = (x2 - x1) * (y2 - y1)
        x_norm = cx / w
        area_norm = area / img_area

        side = compute_position_bins(x_norm)
        dist = area_bin(area_norm)

        if group == "workers":
            workers_side[side] += 1
            workers_dist[dist] += 1
        if group == "channelization":
            channel_side[side] += 1
            channel_dist[dist] += 1

    row = {
        "image_id": img_id,
        "file_name": file_name,
        "total_objs": total_objs,
    }

    for g in ["channelization", "workers", "vehicles", "ttc_signs", "message_board", "other_roadwork"]:
        c = group_counts[g]
        row[f"count_{g}"] = c
        row[f"frac_{g}"] = c / total_objs if total_objs > 0 else 0.0

    for side in ["left", "mid", "right"]:
        row[f"workers_{side}"] = workers_side[side]
        row[f"channelization_{side}"] = channel_side[side]

    for dist in ["near", "mid", "far"]:
        row[f"workers_{dist}"] = workers_dist[dist]
        row[f"channelization_{dist}"] = channel_dist[dist]

    row["scene_label"] = get_scene_label(img_info)

    features.append(row)

# build DataFrame
df_features = pd.DataFrame(features)
df_features.head()

Processing val images:   0%|          | 0/2098 [00:00<?, ?it/s]

Unnamed: 0,image_id,file_name,total_objs,count_channelization,frac_channelization,count_workers,frac_workers,count_vehicles,frac_vehicles,count_ttc_signs,...,channelization_left,workers_mid,channelization_mid,workers_right,channelization_right,workers_near,channelization_near,workers_far,channelization_far,scene_label
0,1,columbus_ed065d9b86d545b2af0042a058e7e907_0000...,9,3,0.333333,0,0.0,6,0.666667,0,...,1,0,1,0,0,0,1,0,1,workzone_ahead
1,2,columbus_ed065d9b86d545b2af0042a058e7e907_0000...,21,6,0.285714,14,0.666667,1,0.047619,0,...,0,0,1,7,4,0,0,14,5,workzone_ahead
2,3,columbus_ed065d9b86d545b2af0042a058e7e907_0000...,3,1,0.333333,0,0.0,2,0.666667,0,...,0,0,0,0,0,0,0,0,1,workzone_ahead
3,4,columbus_ed065d9b86d545b2af0042a058e7e907_0000...,7,2,0.285714,0,0.0,5,0.714286,0,...,1,0,0,0,0,0,1,0,1,workzone_ahead
4,5,columbus_ed065d9b86d545b2af0042a058e7e907_0000...,5,1,0.2,0,0.0,4,0.8,0,...,0,0,0,0,1,0,0,0,1,workzone_ahead


In [11]:
# 9. Filter to rows that actually have a label
df_scene = df_features[df_features["scene_label"].notna()].copy()

print("Scene label distribution:")
print(df_scene["scene_label"].value_counts())
print("df_scene shape:", df_scene.shape)

Scene label distribution:
scene_label
workzone_ahead    2041
lane_shift          56
Name: count, dtype: int64
df_scene shape: (2097, 26)


In [12]:
# 10. Select numeric features for the classifier
feature_cols = [
    "total_objs",
    "count_channelization", "frac_channelization",
    "count_workers",        "frac_workers",
    "count_vehicles",       "frac_vehicles",
    "count_ttc_signs",      "frac_ttc_signs",
    "count_message_board",  "frac_message_board",
    "count_other_roadwork", "frac_other_roadwork",
    "workers_left",  "channelization_left",
    "workers_mid",   "channelization_mid",
    "workers_right", "channelization_right",
    "workers_near",  "channelization_near",
    "workers_mid",   "channelization_mid",   # (depth)
    "workers_far",   "channelization_far",
]

X = df_scene[feature_cols].values

label_to_int = {"workzone_ahead": 0, "lane_shift": 1}
y = df_scene["scene_label"].map(label_to_int).astype(int).values

print("X shape:", X.shape)
print("y shape:", y.shape)
print("Label counts:", pd.Series(y).value_counts())

X shape: (2097, 25)
y shape: (2097,)
Label counts: 0    2041
1      56
Name: count, dtype: int64


In [13]:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report

# 11. Compute class weights to handle imbalance
classes = np.unique(y)
weights = compute_class_weight("balanced", classes=classes, y=y)
class_weight = {c: w for c, w in zip(classes, weights)}
class_weight

{np.int64(0): np.float64(0.513718765311122),
 np.int64(1): np.float64(18.723214285714285)}

In [14]:
# 12. Fit classifier
clf = GradientBoostingClassifier(random_state=0)

# Convert class_weight into sample weights
sample_weights = np.array([class_weight[yi] for yi in y], dtype="float32")

clf.fit(X, y, sample_weight=sample_weights)

# In this first version we evaluate on the same data (sanity check)
y_pred = clf.predict(X)
print(classification_report(y, y_pred, target_names=["workzone_ahead", "lane_shift"]))

                precision    recall  f1-score   support

workzone_ahead       1.00      0.87      0.93      2041
    lane_shift       0.17      0.98      0.30        56

      accuracy                           0.88      2097
     macro avg       0.59      0.93      0.61      2097
  weighted avg       0.98      0.88      0.91      2097



In [15]:
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.25, random_state=0, stratify=y
)

weights_train = compute_class_weight("balanced", classes=classes, y=y_train)
cw_train = {c: w for c, w in zip(classes, weights_train)}
sample_weights_train = np.array([cw_train[yi] for yi in y_train])

clf2 = GradientBoostingClassifier(random_state=0)
clf2.fit(X_train, y_train, sample_weight=sample_weights_train)

y_pred_val = clf2.predict(X_val)
print(classification_report(y_val, y_pred_val, target_names=["workzone_ahead", "lane_shift"]))

                precision    recall  f1-score   support

workzone_ahead       0.98      0.91      0.95       511
    lane_shift       0.08      0.29      0.13        14

      accuracy                           0.90       525
     macro avg       0.53      0.60      0.54       525
  weighted avg       0.96      0.90      0.92       525

