# Scientifically Sound Unsupervised PPE Detection (Colab)
This notebook demonstrates a complete, end-to-end pipeline for unsupervised safety violation detection. It addresses the scientific and implementation issues of the original codebase.

## 1. Setup
First, we mount Google Drive, install the required dependencies, and set up the necessary paths and imports.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Install dependencies
!pip install -r requirements.txt

import sys
import os
from pathlib import Path
import torch
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import transforms
from scipy.optimize import linear_sum_assignment
from tqdm import tqdm

# --- Path Setup ---
sys.path.insert(0, str(Path.cwd()))

from config import CONFIG
from data_utils import prepare_dataset
from unsupervised_trainer import UnsupervisedTrainer
from discovery_processor import DiscoveryProcessor
from violation_processor import ViolationProcessor

print(f"Project root set to: {CONFIG['project_root_path']}")

**Important:** To use your own dataset from Roboflow, you must update the `config.py` file with your Roboflow API key and project details. Alternatively, you can modify the `CONFIG` dictionary directly in the cell below.

In [None]:
# Optional: Override Roboflow config here if you don't want to edit config.py
# CONFIG['roboflow']['api_key'] = "YOUR_API_KEY_HERE"

## 2. Data Preparation and Training

In [None]:
# --- Training Overrides ---# Modify these parameters to experiment with training settings.training_overrides = {    'epochs': 1, # Keep this low for a quick test run    'frozen_layers': 10,    'data_fraction': 0.5 # Use a subset of data for faster execution}CONFIG['model']['frozen_layers'] = training_overrides.get('frozen_layers', CONFIG['model']['frozen_layers'])CONFIG['training']['data_fraction'] = training_overrides.get('data_fraction', CONFIG['training']['data_fraction'])CONFIG['training']['epochs'] = training_overrides.get('epochs', CONFIG['training']['epochs'])# Prepare the dataset using the robust data_utils scriptprint(f"Preparing dataset with {CONFIG['training']['data_fraction'] * 100}% of the data...")image_paths, labels = prepare_dataset(CONFIG['training']['data_fraction'])# --- Dataset and Augmentation Classes (Included for completeness) ---class DataAugmentationDINO(object):    def __init__(self):        self.global_transfo1 = transforms.Compose([            transforms.RandomResizedCrop(224, scale=(0.4, 1.), interpolation=Image.BICUBIC),            transforms.RandomHorizontalFlip(p=0.5),            transforms.ToTensor(),            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),        ])        self.global_transfo2 = transforms.Compose([            transforms.RandomResizedCrop(224, scale=(0.4, 1.), interpolation=Image.BICUBIC),            transforms.RandomHorizontalFlip(p=0.5),            transforms.ToTensor(),            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),        ])        self.local_transfo = transforms.Compose([            transforms.RandomResizedCrop(96, scale=(0.05, 0.4), interpolation=Image.BICUBIC),            transforms.RandomHorizontalFlip(p=0.5),            transforms.ToTensor(),            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),        ])    def __call__(self, image):        crops = []        crops.append(self.global_transfo1(image))        crops.append(self.global_transfo2(image))        for _ in range(8):            crops.append(self.local_transfo(image))        return cropsclass PpeDataset(Dataset):    def __init__(self, image_paths, labels, transform=None):        self.image_paths = image_paths        self.labels = labels        self.transform = transform    def __len__(self):        return len(self.image_paths)    def __getitem__(self, idx):        img_path = self.image_paths[idx]        image = Image.open(img_path).convert('RGB')        label = self.labels[idx]        if self.transform:            image = self.transform(image)        return image, labeltransform = DataAugmentationDINO()dataset = PpeDataset(image_paths, labels, transform=transform)data_loader = DataLoader(dataset, batch_size=CONFIG['training']['batch_size'], shuffle=True)# --- Training --- # Set to False to skip training and use the default pretrained DINOv2 modelrun_training = Falseif run_training:    trainer = UnsupervisedTrainer(CONFIG)    trainer.train(data_loader)else:    print("Skipping training. The default pretrained DINOv2 model will be used for discovery.")

## 3. Unsupervised Object Discovery
Here, we use the new `DiscoveryProcessor` to find objects in a sample image using feature clustering.

In [None]:
model_path = CONFIG['checkpoint_dir_abs'] / 'latest_checkpoint.pt'discovery_processor = DiscoveryProcessor(CONFIG, model_path=model_path if run_training and model_path.exists() else None)valid_image_paths = [p for p in image_paths if 'valid' in str(p)]if valid_image_paths:    sample_image_path = random.choice(valid_image_paths)    sample_image = cv2.imread(str(sample_image_path))    sample_image_rgb = cv2.cvtColor(sample_image, cv2.COLOR_BGR2RGB)    discovered_objects, masks = discovery_processor.discover_objects(sample_image_rgb, n_clusters=4)    plt.figure(figsize=(18, 6))    plt.subplot(1, 3, 1)    plt.imshow(sample_image_rgb)    plt.title('Original Image')    plt.axis('off')    plt.subplot(1, 3, 2)    plt.imshow(masks, cmap='viridis')    plt.title('Discovered Segments')    plt.axis('off')    plt.subplot(1, 3, 3)    plt.imshow(sample_image_rgb)    for obj in discovered_objects:        x1, y1, x2, y2 = obj['box']        plt.gca().add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1, edgecolor='red', facecolor='none', lw=2))        plt.text(x1, y1 - 5, f"Cluster {obj['cluster_id']}", color='white', backgroundcolor='red')    plt.title('Discovered Bounding Boxes')    plt.axis('off')    plt.show()else:    print("No validation images found.")

## 4. Validation: Mapping Clusters to Classes and Evaluating
This is a critical step for any unsupervised method. We map the discovered object clusters to the ground-truth classes and then evaluate the performance using Mean IoU.

In [None]:
def calculate_iou(boxA, boxB):    xA = max(boxA[0], boxB[0])    yA = max(boxA[1], boxB[1])    xB = min(boxA[2], boxB[2])    yB = min(boxA[3], boxB[3])    interArea = max(0, xB - xA) * max(0, yB - yA)    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])    iou = interArea / float(boxAArea + boxBArea - interArea)    return ioudef map_clusters_to_classes(discovery_processor, image_paths, labels, n_clusters=4):    num_classes = len(CONFIG['discovery']['class_map'])    cost_matrix = np.zeros((n_clusters, num_classes))    for img_path, label_list in tqdm(zip(image_paths, labels), total=len(image_paths), desc="Mapping Clusters"):        img = cv2.imread(str(img_path))        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)        discovered_objects, _ = discovery_processor.discover_objects(img_rgb, n_clusters=n_clusters)                for gt_obj in label_list:            gt_box = gt_obj['box']            gt_class_id = gt_obj['class_id']            for pred_obj in discovered_objects:                pred_box = pred_obj['box']                iou = calculate_iou(gt_box, pred_box)                cost_matrix[pred_obj['cluster_id'], gt_class_id] -= iou    row_ind, col_ind = linear_sum_assignment(cost_matrix)    return {r: c for r, c in zip(row_ind, col_ind)}def evaluate_discovery(discovery_processor, cluster_class_map, image_paths, labels, n_clusters=4):    total_iou = 0    gt_box_count = 0    for img_path, label_list in tqdm(zip(image_paths, labels), total=len(image_paths), desc="Evaluating Discovery"):        img = cv2.imread(str(img_path))        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)        discovered_objects, _ = discovery_processor.discover_objects(img_rgb, n_clusters=n_clusters)                for obj in discovered_objects:            obj['class_id'] = cluster_class_map.get(obj['cluster_id'], -1)        for gt_obj in label_list:            gt_box = gt_obj['box']            gt_class_id = gt_obj['class_id']            best_iou = 0            for pred_obj in discovered_objects:                if pred_obj['class_id'] == gt_class_id:                    iou = calculate_iou(gt_box, pred_obj['box'])                    if iou > best_iou:                        best_iou = iou            total_iou += best_iou            gt_box_count += 1    return total_iou / gt_box_count if gt_box_count > 0 else 0# --- Run Validation ---print("\nMapping unsupervised clusters to semantic classes...")valid_indices = [i for i, p in enumerate(image_paths) if 'valid' in str(p)]valid_images = [image_paths[i] for i in valid_indices]valid_labels = [labels[i] for i in valid_indices]subset_size = min(50, len(valid_images))cluster_class_map = map_clusters_to_classes(discovery_processor, valid_images[:subset_size], valid_labels[:subset_size])print(f"Discovered Cluster to Class Map: {cluster_class_map}")print("\nEvaluating discovery performance on the validation subset...")mean_iou = evaluate_discovery(discovery_processor, cluster_class_map, valid_images[:subset_size], valid_labels[:subset_size])print(f"\nMean IoU on Validation Set: {mean_iou:.4f}")

## 5. End-to-End Inference and Violation Detection
Finally, we connect the entire pipeline. We run object discovery, map the results to semantic classes, and feed them to the violation processor. The results are visualized on the output frame.

In [None]:
violation_processor = ViolationProcessor(CONFIG)# --- Single Image Inference Demo ---print("\nRunning inference on a single sample image...")discovered_objects, _ = discovery_processor.discover_objects(sample_image_rgb, n_clusters=4)mapped_objects = []for obj in discovered_objects:    class_id = cluster_class_map.get(obj['cluster_id'], -1)    if class_id != -1:        obj['class_id'] = class_id        mapped_objects.append(obj)violations = violation_processor.process_violations(mapped_objects, sample_image_rgb)# --- Visualize Violations ---output_image = sample_image.copy()violation_person_ids = {v['person_id'] for v in violations}# Draw boxes for all tracked personsfor strack in violation_processor.tracker.tracked_stracks:    if strack.is_activated:        box = strack.tlbr        track_id = strack.track_id        x1, y1, x2, y2 = map(int, box)        color = (0, 0, 255) if track_id in violation_person_ids else (0, 255, 0) # Red for violation, Green for compliance        cv2.rectangle(output_image, (x1, y1), (x2, y2), color, 2)        label = f"Person {track_id}"        if track_id in violation_person_ids:            label += " - VIOLATION"        cv2.putText(output_image, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)plt.figure(figsize=(12, 12))plt.imshow(cv2.cvtColor(output_image, cv2.COLOR_BGR2RGB))plt.title('Inference with Violation Detection')plt.axis('off')plt.show()