In [None]:
import os
import shutil
from pathlib import Path
from PIL import Image
from tqdm import tqdm


In [None]:
# Data: https://www.kaggle.com/datasets/rohitsuresh15/radroad-anomaly-detection

DATASET_SOURCES = ["./archive"]

FINAL_DATASET = Path("RAD_DATASET")

SPLITS = ["train", "valid", "test"]

In [None]:
for split in SPLITS:
    (FINAL_DATASET / "images" / split).mkdir(parents=True, exist_ok=True)
    (FINAL_DATASET / "labels" / split).mkdir(parents=True, exist_ok=True)

In [None]:
def is_image_valid(img_path):
    try:
        with Image.open(img_path) as img:
            img.verify()
        return True
    except:
        return False


def is_label_valid(label_path):
    try:
        with open(label_path) as f:
            lines = f.readlines()

        if len(lines) == 0:
            return False

        for line in lines:
            parts = line.strip().split()
            if len(parts) != 5:
                return False

            cls, x, y, w, h = parts
            cls = int(cls)
            x, y, w, h = map(float, [x, y, w, h])

            if not (0 <= x <= 1 and 0 <= y <= 1 and 0 < w <= 1 and 0 < h <= 1):
                return False

        return True
    except:
        return False

In [None]:
discarded = 0
kept = 0
counter = 0

for src in DATASET_SOURCES:
    src = Path(src)

    for split in SPLITS:
        img_dir = src / split / "images"
        lbl_dir = src / split / "labels"

        if not img_dir.exists() or not lbl_dir.exists():
            continue

        for img_file in tqdm(list(img_dir.glob("*"))):
            label_file = lbl_dir / (img_file.stem + ".txt")

            # Missing label
            if not label_file.exists():
                discarded += 1
                continue

            # Corrupt image
            if not is_image_valid(img_file):
                discarded += 1
                continue

            # Invalid label
            if not is_label_valid(label_file):
                discarded += 1
                continue

            # Passed all checks copy
            new_img_name = f"{img_file.stem}_{counter}{img_file.suffix}"
            new_lbl_name = f"{img_file.stem}_{counter}.txt"

            shutil.copy(img_file, FINAL_DATASET / "images" / split / new_img_name)
            shutil.copy(label_file, FINAL_DATASET / "labels" / split / new_lbl_name)

            counter += 1
            kept += 1

print(f"Kept samples: {kept}")
print(f"Discarded samples: {discarded}")

In [None]:
import yaml
from pathlib import Path
import matplotlib.pyplot as plt

# data.yaml
DATA_YAML = FINAL_DATASET / "data.yaml"

# Load YAML
with open(DATA_YAML) as f:
    data = yaml.safe_load(f)

# Class names
class_names = data["names"]  # list
num_classes = len(class_names)

# Initialize counters
class_counts = {name: 0 for name in class_names}

# Loop through all label files (train + valid + test)
for split in ["train", "valid", "test"]:
    lbl_dir = FINAL_DATASET / "labels" / split
    if not lbl_dir.exists():
        continue

    for lbl_file in lbl_dir.glob("*.txt"):
        with open(lbl_file) as f:
            lines = f.readlines()
            for line in lines:
                if line.strip() == "":
                    continue
                class_id = int(line.split()[0])
                class_name = class_names[class_id]
                class_counts[class_name] += 1

# Plot
plt.figure(figsize=(10,6))
plt.bar(class_counts.keys(), class_counts.values(), color="skyblue")
plt.xlabel("Classes")
plt.ylabel("Number of instances")
plt.title("Class Distribution in Final Dataset")
plt.xticks(rotation=45)
plt.grid(axis="y", linestyle="--", alpha=0.7)
plt.show()

# Print class counts
print("Class distribution:")
for cls, count in class_counts.items():
    print(f"{cls}: {count}")

In [None]:
import cv2
import albumentations as A
from pathlib import Path
import shutil
import random

# Paths
TRAIN_IMAGES = FINAL_DATASET / "images" / "train"
TRAIN_LABELS = FINAL_DATASET / "labels" / "train"
DATA_YAML = FINAL_DATASET / "data.yaml"

# Load class names
import yaml
with open(DATA_YAML) as f:
    data = yaml.safe_load(f)
class_names = data["names"]

# Count current train images per class
class_counts = {name: 0 for name in class_names}
for lbl_file in TRAIN_LABELS.glob("*.txt"):
    with open(lbl_file) as f:
        for line in f:
            if line.strip() == "":
                continue
            class_id = int(line.split()[0])
            class_counts[class_names[class_id]] += 1

print("Train class counts before augmentation:", class_counts)

# Target number of images for minority classes
target_count = 5000

# Albumentations augmentation pipeline
aug = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Rotate(limit=20, p=0.5),
    A.RandomScale(scale_limit=0.2, p=0.5),
])

# Augment minority classes
for cls_name, count in class_counts.items():
    if count >= target_count:
        continue  # already enough images

    n_needed = target_count - count
    print(f"Augmenting {cls_name}: +{n_needed} images")

    # Find all images containing this class
    images_with_class = []
    for lbl_file in TRAIN_LABELS.glob("*.txt"):
        with open(lbl_file) as f:
            for line in f:
                if line.strip() == "":
                    continue
                if class_names[int(line.split()[0])] == cls_name:
                    images_with_class.append(lbl_file.stem)
                    break

    # Perform augmentation
    for i in range(n_needed):
        img_stem = random.choice(images_with_class)
        img_path = TRAIN_IMAGES / f"{img_stem}.jpg"
        lbl_path = TRAIN_LABELS / f"{img_stem}.txt"

        # Read image
        img = cv2.imread(str(img_path))
        augmented = aug(image=img)
        aug_img = augmented["image"]

        # Save augmented image
        new_img_name = f"{img_stem}_aug{i}.jpg"
        cv2.imwrite(str(TRAIN_IMAGES / new_img_name), aug_img)

        # Copy label file
        shutil.copy(lbl_path, TRAIN_LABELS / f"{img_stem}_aug{i}.txt")

# Recount after augmentation
class_counts_after = {name: 0 for name in class_names}
for lbl_file in TRAIN_LABELS.glob("*.txt"):
    with open(lbl_file) as f:
        for line in f:
            if line.strip() == "":
                continue
            class_id = int(line.split()[0])
            class_counts_after[class_names[class_id]] += 1

print("Train class counts after augmentation:", class_counts_after)

In [None]:
from ultralytics import YOLO
import yaml
from pathlib import Path

# Path to dataset and YAML
DATA_YAML = FINAL_DATASET / "data.yaml"
 
# Initialize YOLO11s model
model = YOLO('yolo11s.pt')  # pretrained YOLOv11s model

# Train the model
model.train(
    data=str(DATA_YAML),
    epochs=100,
    imgsz=320,
    batch=16,
    name="yolov11s_trained",
    device=0,           # GPU ID, set -1 for CPU
    workers=4,
    optimizer='SGD',    # or 'Adam', default is SGD
    augment=True,       # enables Mosaic, MixUp, flips, etc.
    plots=True  
)

In [None]:
model = YOLO('./runs/detect/yolov11s_trained/weights/best.pt') 

model.export(format='tflite')
model.export(format='tflite', int8=True, data=str(DATA_YAML))

In [None]:
from ultralytics import YOLO
from pathlib import Path

print("\nEvaluating model on test images...")

best_model_path = './runs/detect/yolov11s_trained/weights/best.pt'

# Load trained model
best_model = YOLO(str(best_model_path))

try:
    # Run evaluation on test split
    metrics = best_model.val(data=str(DATA_YAML), split="test", device="cpu")

    # Get class names safely
    class_names_from_model = getattr(best_model, "names", {})

    # Display results
    display_results(metrics, class_names_from_model)

except Exception as e:
    print(f"Error during evaluation: {e}")

print("\nModel training and evaluation complete!")
print(f"Your best trained model is saved at: {best_model_path}")