# Road Object Classification 
## Setup for Road Signs Classes
### Explore the data

In [None]:
import pandas as pd

df = pd.read_csv("/kaggle/input/german-traffic-sign-dataset/signname.csv")
df.head()

In [None]:
df["ClassId"].size

In [None]:
import pickle

with open("/kaggle/input/german-traffic-sign-dataset/train.p", 'rb') as f:
    train_data = pickle.load(f)
with open("/kaggle/input/german-traffic-sign-dataset/valid.p", 'rb') as f:
    val_data = pickle.load(f)
with open("/kaggle/input/german-traffic-sign-dataset/test.p", 'rb') as f:
    test_data = pickle.load(f)

In [None]:
train_data.keys()

In [None]:
train_data["labels"][0]

In [None]:
train_data["labels"].size

In [None]:
train_data["sizes"][0]

In [None]:
train_data["coords"][0]

In [None]:
train_data["features"][0]

In [None]:
train_data["features"][0].shape

In [None]:
import os
import random
import cv2

# convert to cv2 BGR format
def get_random_img(dataset):
    random_sample = random.choice(dataset["features"])
    return cv2.cvtColor(random_sample, cv2.COLOR_RGB2BGR)

In [None]:
import matplotlib.pyplot as plt

def show(img, cv2_img = True, size=(6, 4)):
    if cv2_img: 
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    plt.figure(figsize=size) # img display size
    plt.imshow(img)
    plt.axis('off')
    plt.show()

In [None]:
img = get_random_img(train_data)
show(img)

### Preprocessing

In [None]:
import numpy as np

def process_image(img): 
    # Step 1: Conditional CLAHE
    # extract luminance channel
    img_yuv = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
    luminance = img_yuv[:, :, 0]
    avg_brightness = np.mean(luminance)

    if avg_brightness < 50:
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
        img_yuv[:, :, 0] = clahe.apply(img_yuv[:, :, 0])
        img_clahe = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2BGR)
    else:
        img_clahe = img.copy()

    # Step 2: Gamma Correction
    lut = np.zeros(256, dtype=np.uint8)
    for i in range(256):
        # normalize pixel value to [0, 1]
        normalized = i / 255.0
        # adaptive gamma factor (current maximum: 1.5)
        gamma = 1.0 + (1.5 - 1.0) * (1 - normalized)  # gamma decreases with brightness
        corrected = pow(normalized, 1.0 / gamma)
        lut[i] = np.clip(corrected * 255, 0, 255)
    img_bright = cv2.LUT(img, lut)


    # Step 3: Unsharp Masking
    blurred = cv2.GaussianBlur(img_bright, (0, 0), sigmaX=2)
    amount = 1.5
    return cv2.addWeighted(img_bright, 1 + amount, blurred, -amount, 0)

In [None]:
processed_img = process_image(img)
show(img)
show(processed_img)

For the folder structure I use numerical classes and look the result up in the csv later.

In [None]:
base_dir = "dataset"
train_dir = os.path.join(base_dir, "train")
test_dir = os.path.join(base_dir, "test")
val_dir = os.path.join(base_dir, "val")
os.makedirs(base_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)


for i in range(0, df["ClassId"].size):
    os.makedirs(os.path.join(train_dir, str(i)), exist_ok=True)
    os.makedirs(os.path.join(test_dir, str(i)), exist_ok=True)
    os.makedirs(os.path.join(val_dir, str(i)), exist_ok=True)

In [None]:
from tqdm.notebook import tqdm

for i, img in enumerate(tqdm(train_data["features"], desc="Processing Images")):
    label = str(train_data["labels"][i])
    out_path = os.path.join(train_dir, label, f"{i}_{label}.png")
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    cv2.imwrite(out_path, process_image(img))

In [None]:
for i, img in enumerate(tqdm(val_data["features"], desc="Processing Images")):
    label = str(val_data["labels"][i])
    out_path = os.path.join(val_dir, label, f"{i}_{label}.png")
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    cv2.imwrite(out_path, process_image(img))

for i, img in enumerate(tqdm(test_data["features"], desc="Processing Images")):
    label = str(test_data["labels"][i])
    out_path = os.path.join(test_dir, label, f"{i}_{label}.png")
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    cv2.imwrite(out_path, process_image(img))

In [None]:
def get_random_cv2_img(path, only_path=False):
    files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
    random_sample = random.choice(files)
    if only_path:
        return random_sample
    else:
        return cv2.imread(os.path.join(path, random_sample))

In [None]:
show(get_random_cv2_img(os.path.join(train_dir, "0")))

## Setup for Traffic Light Classes
### Explore the data

In [None]:
img = get_random_cv2_img("/kaggle/input/carla-traffic-lights-images/traffic_light_data/train/green")
show(img)

43 classes, so 0 - 42

In [None]:
img.shape

In [None]:
resized_img = cv2.resize(img, (32, 32))
show(resized_img)

In [None]:
show(process_image(resized_img))

### Preprocessing

In [None]:
green = pd.DataFrame([{"ClassId": "43", "SignName": "Green"}])
yellow = pd.DataFrame([{"ClassId": "44", "SignName": "Yellow"}])
red = pd.DataFrame([{"ClassId": "45", "SignName": "Red"}])

df = pd.concat([df, green], ignore_index=True)
df = pd.concat([df, yellow], ignore_index=True)
df = pd.concat([df, red], ignore_index=True)

In [None]:
df.tail()

In [None]:
for i in range(43, 46):
    os.makedirs(os.path.join(train_dir, str(i)), exist_ok=True)
    os.makedirs(os.path.join(test_dir, str(i)), exist_ok=True)
    os.makedirs(os.path.join(val_dir, str(i)), exist_ok=True)

In [None]:
green_train = "/kaggle/input/carla-traffic-lights-images/traffic_light_data/train/green"
yellow_train = "/kaggle/input/carla-traffic-lights-images/traffic_light_data/train/yellow"
red_train = "/kaggle/input/carla-traffic-lights-images/traffic_light_data/train/red"

green_val = "/kaggle/input/carla-traffic-lights-images/traffic_light_data/val/green"
yellow_val = "/kaggle/input/carla-traffic-lights-images/traffic_light_data/val/yellow"
red_val = "/kaggle/input/carla-traffic-lights-images/traffic_light_data/val/red"

In [None]:
def process_and_save_images(input_dir, output_dirs, class_id, filename_suffix):
    files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
    
    for i, f in enumerate(tqdm(files, desc=f"Processing {input_dir}")):
        img_path = os.path.join(input_dir, f)
        img = cv2.imread(img_path)

        resized_img = cv2.resize(img, (32, 32))
        processed_img = process_image(resized_img)
        
        for out_dir in output_dirs:
            out_path = os.path.join(out_dir, class_id, f"{i}_{filename_suffix}.png")
            cv2.imwrite(out_path, processed_img)

configs = [
    # train
    (green_train, [train_dir], "43", "43"),
    (yellow_train, [train_dir], "44", "44"),
    (red_train, [train_dir], "45", "45"),
    
    # validation/test
    (green_val, [val_dir, test_dir], "43", "43"),
    (yellow_val, [val_dir, test_dir], "44", "44"),
    (red_val, [val_dir, test_dir], "45", "45"),
]

for input_dir, output_dirs, class_id, suffix in configs:
    process_and_save_images(input_dir, output_dirs, class_id, suffix)

In [None]:
show(get_random_cv2_img(os.path.join(train_dir, "43")))

## Train the classificator

In [None]:
%%capture
! pip install ultralytics

In [None]:
import torch

# Clear cache
torch.cuda.empty_cache()

# Optional: Collect unused memory from Python garbage collector
import gc
gc.collect()

# If using newer PyTorch and CUDA version:
torch.cuda.ipc_collect()

Using the size "s", because it seems to be fast and accurate enought.

In [None]:
from ultralytics import YOLO
model = YOLO("yolo11s-cls.pt")

data = "dataset/"

results = model.train(data=data, batch=32, epochs=30, imgsz=32, device=[0, 1])

## Testing

In [None]:
%%capture
! wget "https://arcxyon.com/wp-content/uploads/2025/08/yolo-detect-m_best_epochs-100_size-460-960_05-08-2025.zip"
! unzip "yolo-detect-m_best_epochs-100_size-460-960_05-08-2025.zip"

In [None]:
detection_model = YOLO("yolo-detect-m_best_epochs-100_size-460-960_05-08-2025/yolo-detect-m_best_epochs-100_size-460-960_05-08-2025.pt")

In [None]:
def refined_class(img):
    # predict using the model (on CUDA device 0)
    results = model.predict(img, device='cuda:0', batch=1)

    # get the top-1 prediction (assuming single image input)
    pred = results[0]  # result for the first (and only) image
    cls = pred.names[pred.probs.top1]  # class label
    conf = pred.probs.top1conf.item()  # confidence score (float)

    return cls, conf

**Note: Make sure to preprocess the image the same as the training data and convert them to RGB**

In [None]:
def get_road_objects(img):

    working_img = img.copy()
    
    # prprocess the images like the training data
    working_img = process_image(working_img)

    # make sure to convert to RGB because it was trained on RGB images
    working_img = cv2.cvtColor(working_img, cv2.COLOR_BGR2RGB)
    # use model.predict on specific device when having multiple gpus
    results = detection_model.predict(working_img, device='cuda:0', batch=1)
    
    # assuming only one image processed per call
    for result in results:
        xyxy = result.boxes.xyxy  # [x1, y1, x2, y2]
        class_ids = result.boxes.cls.int()
        confs = result.boxes.conf
        names = [result.names[cls.item()] for cls in class_ids]

        for i, box in enumerate(xyxy):
            x1, y1, x2, y2 = map(int, box.tolist())
            cls_id = class_ids[i].item()
            conf = confs[i]

            # if it's class 2 (vehicle), skip get_class() and draw it directly
            if cls_id == 2:
                label = f"{names[i]} {conf:.2f}"
            else:
                # crop the image part
                img_part = img[y1:y2, x1:x2]
                
                # again, the classificator was trained on RGB images of size 32 by 32
                img_part = cv2.cvtColor(img_part, cv2.COLOR_BGR2RGB)
                img_part = cv2.resize(img_part, (32, 32))
                img_part = process_image(img_part)
                
                rfc, conf_s = refined_class(img_part)
                
                if refined_class is None:
                    continue  # Skip drawing if classification failed

                sign_name = df.loc[df['ClassId'] == int(rfc), 'SignName'].values[0]
                label = f"{sign_name} {conf_s:.2f}"

            cv2.rectangle(img, (x1, y1), (x2, y2), color=(0, 0, 255), thickness=2)

            cv2.putText(
                img,
                label,
                (x1, max(y1 - 10, 0)),  # Ensure the text isn't drawn out of image bounds
                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                fontScale=0.5,
                color=(0, 0, 255),
                thickness=1
            )
            
    return img

In [None]:
%%capture
! wget "https://arcxyon.com/wp-content/uploads/2025/08/Raw_Examples_04-08-2025.zip"
! unzip "Raw_Examples_04-08-2025.zip"

In [None]:
example_img_path = "67/"

In [None]:
img_name = get_random_cv2_img(example_img_path, True)
img_path = os.path.join(example_img_path, img_name)
img = cv2.imread(img_path)

show(get_road_objects(img), cv2_img=True, size=(12, 10))

## 