In [None]:
import torch
import torchvision.transforms as transforms
from torchvision.models import vit_b_16
from PIL import Image
import cv2
import numpy as np
from collections import OrderedDict
import time
import winsound
import os

# Configuration
ALERT_DURATION = 3  # seconds
MIN_CONFIDENCE = 0.7  # Only consider detections above this confidence
ALERT_COOLDOWN = 5  # seconds between alerts for same person
ENTRANCE_REGION = (0, 0, 1280, 720)  # Define your camera's view area

# Create directory for model files if not exists
os.makedirs('models', exist_ok=True)

# Download face detection model files if not present
if not os.path.exists('models/haarcascade_frontalface_default.xml'):
    print("Downloading face detection model...")
    import urllib.request
    urllib.request.urlretrieve(
        "https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml",
        "models/haarcascade_frontalface_default.xml"
    )

# Load face detection model (using Haar Cascade as fallback)
face_cascade = cv2.CascadeClassifier('models/haarcascade_frontalface_default.xml')

# Initialize helmet detection model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = vit_b_16(pretrained=False, num_classes=2)

# Load trained weights (with error handling)
try:
    state_dict = torch.load('my_vit_model.pth', map_location=device)
    new_state_dict = OrderedDict()
    for k, v in state_dict.items():
        if k == 'heads.weight':
            k = 'heads.head.weight'
        elif k == 'heads.bias':
            k = 'heads.head.bias'
        new_state_dict[k] = v
    model.load_state_dict(new_state_dict)
    model = model.to(device)
    model.eval()
except Exception as e:
    print(f"Error loading model: {e}")
    exit()

# Image transformations
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Initialize webcam
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)

# Tracking variables
last_alert_time = 0
tracked_persons = {}

def in_entrance_region(box):
    x, y, w, h = box
    center_x = x + w//2
    center_y = y + h//2
    return (ENTRANCE_REGION[0] < center_x < ENTRANCE_REGION[2] and 
            ENTRANCE_REGION[1] < center_y < ENTRANCE_REGION[3])

print("Starting workplace safety monitoring...")
print("Press 'q' to quit")

while True:
    ret, frame = cap.read()
    if not ret:
        print("Error reading frame")
        break
    
    current_time = time.time()
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # Detect faces using Haar Cascade
    faces = face_cascade.detectMultiScale(
        gray,
        scaleFactor=1.1,
        minNeighbors=5,
        minSize=(30, 30),
        flags=cv2.CASCADE_SCALE_IMAGE
    )
    
    for (x, y, w, h) in faces:
        if not in_entrance_region((x, y, w, h)):
            continue
            
        # Expand to head region
        head_y1 = max(0, y - int(h * 0.4))
        head_y2 = min(frame.shape[0], y + h + int(h * 0.1))
        head_x1 = max(0, x - int(w * 0.3))
        head_x2 = min(frame.shape[1], x + w + int(w * 0.3))
        
        head_roi = frame[head_y1:head_y2, head_x1:head_x2]
        if head_roi.size == 0:
            continue
            
        try:
            # Helmet detection
            pil_img = Image.fromarray(cv2.cvtColor(head_roi, cv2.COLOR_BGR2RGB))
            img_tensor = transform(pil_img).unsqueeze(0).to(device)
            
            with torch.no_grad():
                output = model(img_tensor)
                probs = torch.softmax(output, dim=1)
            
            pred_class = torch.argmax(probs).item()
            helmet_confidence = probs[0][pred_class].item()
            
            if helmet_confidence < MIN_CONFIDENCE:
                continue
                
            # Tracking and visualization
            person_id = f"{x}_{y}"
            color = (0, 255, 0) if pred_class == 1 else (0, 0, 255)
            
            cv2.rectangle(frame, (head_x1, head_y1), (head_x2, head_y2), color, 2)
            label = "HELMET" if pred_class == 1 else "NO HELMET!"
            cv2.putText(frame, f"{label} {helmet_confidence:.2f}", 
                       (head_x1, head_y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            # Alert logic
            if pred_class == 0 and current_time - last_alert_time > ALERT_COOLDOWN:
                if person_id not in tracked_persons or current_time - tracked_persons[person_id] > ALERT_COOLDOWN:
                    print(f"{time.ctime()} - ALERT: Worker without helmet detected!")
                    winsound.Beep(1000, 1000)
                    last_alert_time = current_time
                    tracked_persons[person_id] = current_time
                    
        except Exception as e:
            print(f"Processing error: {e}")
            continue
    
    # Draw monitoring zone
    cv2.rectangle(frame, (ENTRANCE_REGION[0], ENTRANCE_REGION[1]), 
                 (ENTRANCE_REGION[2], ENTRANCE_REGION[3]), (255, 0, 0), 2)
    cv2.putText(frame, "Monitoring Zone", (ENTRANCE_REGION[0]+10, ENTRANCE_REGION[1]+30), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
    
    # Display FPS
    fps = cap.get(cv2.CAP_PROP_FPS)
    cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    
    cv2.imshow('Workplace Safety Monitoring', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
print("Monitoring stopped")



Starting workplace safety monitoring...
Press 'q' to quit
Sun Jul 27 21:08:00 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:08:03 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:08:14 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:08:21 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:08:29 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:08:36 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:08:43 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:08:52 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:08:55 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:09:00 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:09:06 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:09:12 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:09:20 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:09:26 2025 - ALERT: Worker without helmet detected!
Sun Jul 27 21:09:3