# PPE Detection YOLOv8 Training Pipeline

Train YOLOv8 model to detect Personal Protective Equipment violations.

Model saves to:
- Kaggle: /kaggle/working/models/ppe/best.pt
- Colab: /content/drive/MyDrive/ppe-models/best.pt
- Local: ./models/ppe/best.pt

## Step 1: Environment Setup

In [None]:
import os
import sys

try:
    IS_COLAB = 'google.colab' in str(get_ipython())
except:
    IS_COLAB = False

IS_KAGGLE = os.path.exists('/kaggle/input')

if IS_COLAB:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    BASE_DIR = '/content/drive/MyDrive/ppe-models'
    DATASET_PATH = '/content/ppe-dataset'
    WORKING_DIR = '/content'
elif IS_KAGGLE:
    BASE_DIR = '/kaggle/working/models'
    DATASET_PATH = '/kaggle/input/ppe-dataset-yolov8'
    WORKING_DIR = '/kaggle/working'
else:
    BASE_DIR = './models'
    DATASET_PATH = './datasets/ppe'
    WORKING_DIR = '.'

MODELS_DIR = os.path.join(BASE_DIR, 'ppe')
RESULTS_DIR = os.path.join(WORKING_DIR, 'results')

os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

print(f"Models: {MODELS_DIR}")
print(f"Dataset: {DATASET_PATH}")
print(f"Results: {RESULTS_DIR}")

## Step 2: Install Packages

In [None]:
import subprocess

packages = ['ultralytics', 'opencv-python', 'pillow', 'pyyaml']
for pkg in packages:
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', pkg])

print('Installed')

## Step 3: Import Libraries

In [None]:
import cv2
import numpy as np
import torch
import yaml
import json
import shutil
from datetime import datetime
from pathlib import Path
from ultralytics import YOLO
from PIL import Image

gpu_available = torch.cuda.is_available()
device = 0 if gpu_available else 'cpu'

print(f"PyTorch: {torch.__version__}")
print(f"GPU: {gpu_available}")
if gpu_available:
    print(f"Device: {torch.cuda.get_device_name(0)}")
print(f"Using: {device}")

## Step 4: Download Dataset

In [None]:
if IS_COLAB:
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', 'kagglehub'])
    import kagglehub
    
    print('Downloading PPE dataset from Kaggle...')
    download_path = kagglehub.dataset_download('shlokraval/ppe-dataset-yolov8')
    print(f"Downloaded to: {download_path}")
    
    DATASET_PATH = download_path
elif IS_KAGGLE:
    print('Using Kaggle input dataset')
else:
    print('Local mode: ensure dataset is in ./datasets/ppe')

print(f"Dataset path: {DATASET_PATH}")

## Step 5: Configure Dataset

In [None]:
CLASS_NAMES = {
    0: 'helmet',
    1: 'vest',
    2: 'gloves',
    3: 'safety_shoes',
    4: 'goggles',
    5: 'face_shield'
}

dataset_exists = os.path.exists(DATASET_PATH)
print(f"Dataset exists: {dataset_exists}")

if dataset_exists:
    yaml_path = os.path.join(DATASET_PATH, 'data.yaml')
    
    if not os.path.exists(yaml_path):
        data_config = {
            'path': DATASET_PATH,
            'train': 'images/train',
            'val': 'images/val',
            'test': 'images/test',
            'nc': len(CLASS_NAMES),
            'names': list(CLASS_NAMES.values())
        }
        with open(yaml_path, 'w') as f:
            yaml.dump(data_config, f)
        print('Created data.yaml')
    else:
        print('Found data.yaml')
    
    for split in ['train', 'val', 'test']:
        img_dir = os.path.join(DATASET_PATH, 'images', split)
        if os.path.exists(img_dir):
            count = len([f for f in os.listdir(img_dir) if f.endswith(('.jpg', '.png'))])
            print(f"{split}: {count} images")
else:
    print('Dataset not found')
    yaml_path = None

## Step 6: Load Model

In [None]:
MODEL_SIZE = 's'

model = YOLO(f'yolov8{MODEL_SIZE}.pt')
params = sum(p.numel() for p in model.model.parameters()) / 1e6

print(f"Model: YOLOv8{MODEL_SIZE}")
print(f"Parameters: {params:.2f}M")

## Step 7: Train Model

In [None]:
if not dataset_exists:
    print('Cannot train without dataset')
else:
    results = model.train(
        data=yaml_path,
        epochs=20,
        imgsz=640,
        batch=16,
        patience=20,
        device=device,
        project=os.path.join(WORKING_DIR, 'runs/detect'),
        name='ppe_detector',
        exist_ok=True,
        optimizer='SGD',
        lr0=0.01,
        lrf=0.01,
        momentum=0.937,
        weight_decay=0.0005,
        warmup_epochs=3,
        box=7.5,
        cls=0.5,
        dfl=1.5,
        save=True,
        val=True,
        plots=True
    )
    print('Training complete')

## Step 8: Evaluate Model

In [None]:
if not dataset_exists:
    print('Cannot evaluate without dataset')
else:
    best_path = os.path.join(WORKING_DIR, 'runs/detect/ppe_detector/weights/best.pt')
    
    if os.path.exists(best_path):
        eval_model = YOLO(best_path)
        val_results = eval_model.val(
            data=yaml_path,
            imgsz=640,
            batch=16,
            device=device
        )
        
        print('Performance Metrics:')
        if hasattr(val_results, 'box'):
            print(f"mAP50: {val_results.box.map50:.4f}")
            print(f"mAP50-95: {val_results.box.map:.4f}")
            print(f"Precision: {val_results.box.mp:.4f}")
            print(f"Recall: {val_results.box.mr:.4f}")
            
            if hasattr(val_results.box, 'ap_class_index'):
                print('Per-class mAP50:')
                for i, cls_id in enumerate(val_results.box.ap_class_index):
                    name = CLASS_NAMES.get(int(cls_id), f"Class {cls_id}")
                    print(f"  {name}: {val_results.box.ap50[i]:.4f}")
    else:
        print('Best model not found')

## Step 9: Save Model

In [None]:
source_path = os.path.join(WORKING_DIR, 'runs/detect/ppe_detector/weights/best.pt')
final_path = os.path.join(MODELS_DIR, 'best.pt')

if os.path.exists(source_path):
    shutil.copy2(source_path, final_path)
    size_mb = os.path.getsize(final_path) / (1024 * 1024)
    
    print(f"Model saved: {final_path}")
    print(f"Size: {size_mb:.2f} MB")
    
    metadata = {
        'model_name': f'PPE_YOLOv8{MODEL_SIZE}',
        'classes': CLASS_NAMES,
        'num_classes': len(CLASS_NAMES),
        'trained': datetime.now().isoformat(),
        'framework': 'YOLOv8',
        'image_size': 640,
        'path': final_path,
        'size_mb': round(size_mb, 2)
    }
    
    meta_path = os.path.join(MODELS_DIR, 'metadata.json')
    with open(meta_path, 'w') as f:
        json.dump(metadata, f, indent=2)
    
    readme = f'''# PPE Detection Model

Model: YOLOv8{MODEL_SIZE}
Classes: {', '.join(CLASS_NAMES.values())}
Size: {size_mb:.2f} MB

## Usage

Load model:
```python
from ultralytics import YOLO
model = YOLO('best.pt')
```

Image prediction:
```python
results = model.predict('image.jpg', conf=0.5)
results[0].show()
```

Video prediction:
```python
results = model.predict('video.mp4', save=True)
```

Get detections:
```python
for box in results[0].boxes:
    cls = int(box.cls)
    conf = float(box.conf)
    print(f"Class: {{cls}}, Conf: {{conf:.2f}}")
```
'''
    
    with open(os.path.join(MODELS_DIR, 'README.md'), 'w') as f:
        f.write(readme)
    
    print(f"Metadata: {meta_path}")
    print(f"README: {os.path.join(MODELS_DIR, 'README.md')}")
else:
    print('Trained model not found')

## Step 10: Test on Images

In [None]:
if os.path.exists(final_path):
    test_model = YOLO(final_path)
    
    def predict_image(img_path, conf=0.5):
        if not os.path.exists(img_path):
            return None
        return test_model.predict(
            source=img_path,
            conf=conf,
            imgsz=640,
            device=device,
            save=True,
            project=RESULTS_DIR,
            name='predictions'
        )
    
    test_dir = os.path.join(DATASET_PATH, 'images', 'val')
    
    if os.path.exists(test_dir):
        samples = [f for f in os.listdir(test_dir) if f.endswith(('.jpg', '.png'))][:3]
        
        if samples:
            print(f"Testing on {len(samples)} images")
            for img in samples:
                path = os.path.join(test_dir, img)
                results = predict_image(path)
                
                if results and len(results[0].boxes) > 0:
                    print(f"{img}: {len(results[0].boxes)} detections")
                    for box in results[0].boxes:
                        cls = int(box.cls.item())
                        conf = box.conf.item()
                        name = CLASS_NAMES.get(cls, f'Class{cls}')
                        print(f"  {name}: {conf:.2%}")
                else:
                    print(f"{img}: no detections")
            print(f"Results: {RESULTS_DIR}/predictions/")
        else:
            print('No sample images')
    else:
        print('Validation directory not found')
else:
    print('Model not found')

## Step 11: Video Processing

In [None]:
def process_video(video_path, output_path, conf=0.5, skip=1, max_frames=None):
    if not os.path.exists(video_path):
        print('Video not found')
        return False
    
    if not os.path.exists(final_path):
        print('Model not found')
        return False
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print('Cannot open video')
        return False
    
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video: {w}x{h} @ {fps}fps, {total} frames")
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))
    
    vid_model = YOLO(final_path)
    frame_count = 0
    detect_count = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        
        if max_frames and frame_count > max_frames:
            break
        
        if frame_count % skip == 0:
            try:
                results = vid_model.predict(
                    source=frame,
                    conf=conf,
                    imgsz=640,
                    device=device,
                    verbose=False
                )
                frame = results[0].plot()
                if len(results[0].boxes) > 0:
                    detect_count += 1
            except Exception as e:
                print(f"Error frame {frame_count}: {e}")
        
        out.write(frame)
        
        if frame_count % 30 == 0:
            print(f"Processed {frame_count}/{total}")
    
    cap.release()
    out.release()
    
    print(f"Done: {frame_count} frames, {detect_count} with detections")
    print(f"Saved: {output_path}")
    return True

print('Video processing function ready')
print('Usage: process_video("input.mp4", "output.mp4", conf=0.5)')

## Summary

Pipeline complete. Model saved and ready for inference.

Download model from:
- Kaggle: /kaggle/working/models/ppe/best.pt
- Colab: /content/drive/MyDrive/ppe-models/best.pt
- Local: ./models/ppe/best.pt