<a href="https://colab.research.google.com/github/axeltanjung/safety_helmet_detection/blob/main/notebook/safety_helmet_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import Dataset

In [1]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("andrewmvd/hard-hat-detection")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'hard-hat-detection' dataset.
Path to dataset files: /kaggle/input/hard-hat-detection


In [2]:
!cp -r /kaggle/input/hard-hat-detection /kaggle/working/

In [3]:
!pip install ultralytics pillow lxml

Collecting ultralytics
  Downloading ultralytics-8.3.241-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.241-py3-none-any.whl (1.1 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.1/1.1 MB[0m [31m66.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.241 ultralytics-thop-2.0.18


# Preprocess Dataset

In [4]:

import os
import xml.etree.ElementTree as ET
from PIL import Image

# Paths (Kaggle writable directory). Make sure these folders exist in /kaggle/working/
DATASET_DIR = "/kaggle/working"
IMAGE_DIR = os.path.join(DATASET_DIR, "images")
ANNOTATION_DIR = os.path.join(DATASET_DIR, "annotations")
LABEL_DIR = os.path.join(DATASET_DIR, "labels")

# Classes (edit if needed)
CLASSES = ["helmet"]  # e.g., ["helmet", "person"] if you have multiple

# Ensure required folders exist
for p in [IMAGE_DIR, ANNOTATION_DIR]:
    if not os.path.isdir(p):
        raise FileNotFoundError(f"Folder not found: {p}. "
                                f"If your dataset is under /kaggle/input, copy it first:\n"
                                f"!cp -r /kaggle/input/hard-hat-detection /kaggle/working/\n"
                                f"And then set IMAGE_DIR='/kaggle/working/hard-hat-detection/images', "
                                f"ANNOTATION_DIR='/kaggle/working/hard-hat-detection/annotations'")

os.makedirs(LABEL_DIR, exist_ok=True)


def normalize_bbox(img_size, box):
    """Convert Pascal VOC (xmin, xmax, ymin, ymax) to YOLO (xc, yc, w, h) normalized."""
    img_w, img_h = img_size
    xmin, xmax, ymin, ymax = box

    # Clip to image bounds (defensive)
    xmin = max(0.0, min(xmin, img_w))
    xmax = max(0.0, min(xmax, img_w))
    ymin = max(0.0, min(ymin, img_h))
    ymax = max(0.0, min(ymax, img_h))

    # Compute normalized values
    x_center = ((xmin + xmax) / 2.0) / img_w
    y_center = ((ymin + ymax) / 2.0) / img_h
    width = (xmax - xmin) / img_w
    height = (ymax - ymin) / img_h

    # Guard against zero/negative boxes
    if width <= 0 or height <= 0:
        return None

    # Guard against out-of-range
    x_center = min(max(x_center, 0.0), 1.0)
    y_center = min(max(y_center, 0.0), 1.0)
    width = min(max(width, 1e-6), 1.0)
    height = min(max(height, 1e-6), 1.0)

    return x_center, y_center, width, height


converted = 0
skipped = 0

for xml_file in os.listdir(ANNOTATION_DIR):
    if not xml_file.lower().endswith(".xml"):
        continue

    xml_path = os.path.join(ANNOTATION_DIR, xml_file)
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
    except Exception as e:
        print(f"‚ùå Failed to parse XML: {xml_path} ({e})")
        skipped += 1
        continue

    # Try to read image filename
    node_filename = root.find("filename")
    if node_filename is None or not node_filename.text:
        print(f"‚ùå Missing <filename> in: {xml_file}")
        skipped += 1
        continue

    image_name = node_filename.text.strip()
    image_path = os.path.join(IMAGE_DIR, image_name)

    if not os.path.exists(image_path):
        print(f"‚ùå Image not found for XML: {xml_file} ‚Üí {image_path}")
        skipped += 1
        continue

    # Get image size (prefer reading actual image)
    try:
        with Image.open(image_path) as im:
            img_w, img_h = im.size
    except Exception:
        # Fallback: read from XML <size>
        size_node = root.find("size")
        if size_node is None:
            print(f"‚ùå Missing image size for: {image_name}")
            skipped += 1
            continue
        img_w = float(size_node.find("width").text)
        img_h = float(size_node.find("height").text)

    label_file = os.path.splitext(image_name)[0] + ".txt"
    label_path = os.path.join(LABEL_DIR, label_file)

    wrote_any = False
    with open(label_path, "w") as f:
        for obj in root.findall("object"):
            name_node = obj.find("name")
            if name_node is None or not name_node.text:
                continue

            class_name = name_node.text.strip()
            if class_name not in CLASSES:
                # Skip unknown classes
                continue

            class_id = CLASSES.index(class_name)
            bndbox = obj.find("bndbox")
            if bndbox is None:
                continue

            try:
                xmin = float(bndbox.find("xmin").text)
                ymin = float(bndbox.find("ymin").text)
                xmax = float(bndbox.find("xmax").text)
                ymax = float(bndbox.find("ymax").text)
            except Exception:
                # Malformed bndbox
                continue

            yolo_bbox = normalize_bbox((img_w, img_h), (xmin, xmax, ymin, ymax))
            if yolo_bbox is None:
                # Invalid box
                continue

            f.write(
                f"{class_id} " +
                " ".join(f"{v:.6f}" for v in yolo_bbox) +
                "\n"
            )
            wrote_any = True

    if wrote_any:
        converted += 1
    else:
        # Remove empty label file to avoid "no labels" issues later
        try:
            os.remove(label_path)
        except FileNotFoundError:
            pass

print(f"‚úÖ Conversion done. Labels written for {converted} images, skipped {skipped}.")


‚úÖ Conversion done. Labels written for 4581 images, skipped 0.


# Split Train / Test

In [5]:

import os
import shutil
import random

# Sesuaikan path dataset kamu
BASE = "/kaggle/working"
IMAGE_DIR = os.path.join(BASE, "images")         # contoh: "/kaggle/working/hard-hat-detection/images"
LABEL_DIR = os.path.join(BASE, "labels")         # contoh: "/kaggle/working/hard-hat-detection/labels"

# Jika dataset kamu ada di /kaggle/working/hard-hat-detection, gunakan:
# IMAGE_DIR = "/kaggle/working/hard-hat-detection/images"
# LABEL_DIR = "/kaggle/working/hard-hat-detection/labels"

# Pastikan folder ada
for p in [IMAGE_DIR, LABEL_DIR]:
    if not os.path.isdir(p):
        raise FileNotFoundError(f"Folder tidak ditemukan: {p}. Pastikan sudah copy dari /kaggle/input ke /kaggle/working")

# 1) Buat label kosong untuk semua gambar yang belum ada .txt
image_files = [f for f in os.listdir(IMAGE_DIR) if f.lower().endswith((".jpg", ".jpeg", ".png"))]

created_empty = 0
for img in image_files:
    stem = os.path.splitext(img)[0]
    lbl = os.path.join(LABEL_DIR, stem + ".txt")
    if not os.path.exists(lbl):
        # Buat file label kosong (valid buat YOLOv8)
        open(lbl, "w").close()
        created_empty += 1

print(f"‚úÖ Dibuat {created_empty} file label kosong untuk gambar tanpa anotasi.")

# 2) Siapkan folder split
for split in ["train", "val"]:
    os.makedirs(os.path.join(IMAGE_DIR, split), exist_ok=True)
    os.makedirs(os.path.join(LABEL_DIR, split), exist_ok=True)

# 3) Split train/val
TRAIN_RATIO = 0.8
random.shuffle(image_files)
split_idx = int(len(image_files) * TRAIN_RATIO)
train_imgs = image_files[:split_idx]
val_imgs = image_files[split_idx:]

def move_pair(img_list, split):
    moved = 0
    for img in img_list:
        src_img = os.path.join(IMAGE_DIR, img)
        dst_img = os.path.join(IMAGE_DIR, split, img)

        stem = os.path.splitext(img)[0]
        src_lbl = os.path.join(LABEL_DIR, stem + ".txt")
        dst_lbl = os.path.join(LABEL_DIR, split, stem + ".txt")

        # Pindahkan image
        shutil.move(src_img, dst_img)
        # Pindahkan label (dijamin ada karena kita buat kosong jika tidak ada)
        shutil.move(src_lbl, dst_lbl)
        moved += 1
    return moved

moved_train = move_pair(train_imgs, "train")
moved_val = move_pair(val_imgs, "val")

print(f"‚úÖ Split selesai. Train: {moved_train} | Val: {moved_val}")

‚úÖ Dibuat 419 file label kosong untuk gambar tanpa anotasi.
‚úÖ Split selesai. Train: 4000 | Val: 1000


In [6]:
import glob, os
print("Images total:", len(glob.glob(os.path.join(IMAGE_DIR, "*.[jp][pn]g"))))
print("Labels total:", len(glob.glob(os.path.join(LABEL_DIR, "*.txt"))))

Images total: 0
Labels total: 0


In [7]:
data_yaml = """
path: /kaggle/working/
train: images/train
val: images/val

names:
  0: helmet
"""

with open("/kaggle/working/data.yaml", "w") as f:
    f.write(data_yaml)

print("‚úÖ data.yaml created")

‚úÖ data.yaml created


In [None]:
from ultralytics import YOLO

model = YOLO("yolov8s.pt")

model.train(
    data="/kaggle/working/data.yaml",
    epochs=100,
    imgsz=640,
    batch=16
)



Creating new Ultralytics Settings v0.0.6 file ‚úÖ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt': 100% ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ 21.5MB 422.7MB/s 0.1s
Ultralytics 8.3.241 üöÄ Python-3.12.12 torch-2.9.0+cu126 CUDA:0 (Tesla T4, 15095MiB)
[34m[1mengine/trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=16, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, compile=False, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=/kaggle/working/data.yaml, degrees=0.0, deterministic=True, device=None, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=100, 

## Uji cepat pada folder val / sample image

In [None]:
best_path = model.ckpt_path if hasattr(model, "ckpt_path") else "runs/detect/train/weights/best.pt"
best_model = YOLO(best_path)

# Prediksi folder val
best_model.predict(
    source=os.path.join(BASE_DIR, "images/val"),
    conf=0.4,
    save=True,   # simpan visualisasi
    project="runs/detect",
    name="predict_val",
    exist_ok=True
)

## NO‚ÄëHelmet Logic (heuristik bagian kepala)

Ide: Untuk setiap person box, ambil area kepala sebagai bagian atas dari bounding box (mis. top 35%). Jika tidak ada helmet box yang overlap dengan area kepala (IoU > threshold kecil), maka itu violation (no‚Äëhelmet).

Ini heuristik yang bekerja baik untuk kamera statis dari atas/bawah‚Äîkalau kamera miring/occlusion berat, kita bisa refine (mis. deteksi pose atau segmentasi kepala).

In [None]:

import cv2
import numpy as np
from ultralytics import YOLO

best_model = YOLO(best_path)

# Hyperparameter heuristik:
HEAD_RATIO = 0.35    # proporsi tinggi box person untuk area kepala (0.30‚Äì0.40 umum)
IOU_THRESH = 0.10     # threshold overlap minimal antara head-region dan helmet box

def iou(boxA, boxB):
    # box = [x1, y1, x2, y2]
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interW = max(0, xB - xA)
    interH = max(0, yB - yA)
    interArea = interW * interH
    if interArea == 0:
        return 0.0
    areaA = (boxA[2]-boxA[0]) * (boxA[3]-boxA[1])
    areaB = (boxB[2]-boxB[0]) * (boxB[3]-boxB[1])
    return interArea / (areaA + areaB - interArea + 1e-6)

def detect_no_helmet(image_bgr):
    # Run inference
    res = best_model.predict(image_bgr, imgsz=640, conf=0.4, verbose=False)[0]
    boxes = res.boxes
    clss = boxes.cls.cpu().numpy().astype(int)
    xyxy = boxes.xyxy.cpu().numpy()  # [N, 4]
    # 0: person, 1: helmet (sesuai data.yaml)

    person_boxes = xyxy[clss == 0]
    helmet_boxes = xyxy[clss == 1]

    violations = []  # list of indices atau koordinat
    for pb in person_boxes:
        x1, y1, x2, y2 = pb
        h = y2 - y1
        head_h = HEAD_RATIO * h
        head_box = [x1, y1, x2, y1 + head_h]  # kepala: bagian atas dari person box

        has_helmet = False
        for hb in helmet_boxes:
            if iou(head_box, hb) >= IOU_THRESH:
                has_helmet = True
                break
        if not has_helmet:
            violations.append(head_box)

    return xyxy, clss, violations

# Uji ke 1 image (ganti path ke test image)
test_img_path = os.path.join(BASE_DIR, "images/val", os.listdir(os.path.join(BASE_DIR, "images/val"))[0])
img = cv2.imread(test_img_path)

xyxy, clss, violations = detect_no_helmet(img)
print(f"Person+Helmet detections: {len(xyxy)} | Violations (no-helmet): {len(violations)}")

## Realtime CCTV (RTSP) dengan overlay NO‚Äëhelmet

In [None]:

import time

RTSP_URL = "rtsp://user:pass@IP_CAMERA:554/Streaming/Channels/101"  # Ganti sesuai kamera

cap = cv2.VideoCapture(RTSP_URL)
if not cap.isOpened():
    print("‚ùå Tidak bisa membuka stream. Cek RTSP_URL / kredensial.")

# Warna & style
GREEN = (0, 200, 0)
RED   = (0, 0, 255)
YELLOW= (0, 255, 255)

def draw_box(img, box, color, label=None):
    x1, y1, x2, y2 = map(int, box)
    cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
    if label:
        cv2.putText(img, label, (x1, max(0, y1-10)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

fps_hist = []
while True:
    ok, frame = cap.read()
    if not ok:
        print("Stream terputus / buffer kosong.")
        time.sleep(0.5)
        continue

    t0 = time.time()
    xyxy, clss, violations = detect_no_helmet(frame)

    # Gambar semua deteksi
    for box, c in zip(xyxy, clss):
        label = "person" if c == 0 else "helmet"
        color = GREEN if c == 1 else YELLOW
        draw_box(frame, box, color, label)

    # Tampilkan violations (kepala merah)
    for hb in violations:
        draw_box(frame, hb, RED, "NO-HELMET")

    # FPS
    fps = 1.0 / (time.time() - t0 + 1e-6)
    fps_hist.append(fps)
    fps_hist = fps_hist[-30:]
    cv2.putText(frame, f"FPS: {np.mean(fps_hist):.1f}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)

    # Tampilkan hasil (di Colab gunakan cv2_imshow; di Jupyter lokal pakai cv2.imshow)
    # Colab:
    from google.colab.patches import cv2_imshow
    cv2_imshow(frame)

    # Keluar setelah beberapa frame (untuk demo)
    # break


Untuk deployment di laptop/PC: ganti cv2_imshow dengan cv2.imshow("Helmet Detection", frame) dan tambahkan if cv2.waitKey(1) & 0xFF == 27: break. Jangan lupa cap.release(); cv2.destroyAllWindows().

## Export ke ONNX & TensorRT

ONNX universal; TensorRT memberi latency/throughput terbaik di NVIDIA GPU (butuh runtime TensorRT & CUDA di environment). Kalau TensorRT belum tersedia, gunakan ONNX dulu.

In [None]:

from ultralytics import YOLO

best_model = YOLO(best_path)

# Export ONNX (dynamic batch & opset umum)
best_model.export(
    format="onnx",
    opset=12,
    dynamic=True,
    imgsz=640
)
# hasil: runs/detect/train/weights/best.onnx

# Export TensorRT (butuh TensorRT terpasang)
# Untuk INT8 perlu kalibrasi dataset; di sini contoh FP16 ("half=True")
try:
    best_model.export(
        format="engine",
        half=True,    # FP16
        imgsz=640
    )
    print("‚úÖ TensorRT engine diexport: best.engine")
except Exception as e:
    print("‚ö†Ô∏è Gagal export TensorRT. Pakai ONNX dulu. Error:", e)


## Inference dengan ONNX Runtime

Kalau kamu ingin menjalankan model ONNX tanpa ultralytics, bisa pakai onnxruntime:

In [None]:

!pip install onnxruntime-gpu

import onnxruntime as ort
import numpy as np
import cv2

onnx_path = "runs/detect/train/weights/best.onnx"
session = ort.InferenceSession(onnx_path, providers=["CUDAExecutionProvider", "CPUExecutionProvider"])

def preprocess(img_bgr, size=640):
    img = cv2.resize(img_bgr, (size, size))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.astype(np.float32) / 255.0
    img = img.transpose(2, 0, 1)[None]  # NCHW
    return img

# Sesuaikan input/output names sesuai model ONNX (cek session.get_inputs()/get_outputs())
inp_name = session.get_inputs()[0].name
out_names = [o.name for o in session.get_outputs()]

img = cv2.imread(test_img_path)
inp = preprocess(img)
outputs = session.run(out_names, {inp_name: inp})
print("ONNX outputs:", [o.shape for o in outputs])
