In [4]:
import warnings
warnings.filterwarnings('ignore')

In [5]:
import pathlib
from pathlib import Path
temp = pathlib.PosixPath
pathlib.PosixPath = pathlib.WindowsPath

In [6]:
import torch
import cv2
import os
import numpy as np
from datetime import datetime

# Define global parameters
CONFIDENCE_THRESHOLD = 0.5
NMS_THRESHOLD = 0.4
VIDEO_FILE = "video2.mp4"
OUTPUT_DIR = "Detection Output"
FRAME_SIZE = (1020, 600)
CLASSES = ["Helmet", "No Safety Vest", "No Helmet", "Safety Vest", "Shoes"]
FONT = cv2.FONT_HERSHEY_SIMPLEX

# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

def load_model(weights_path='yolov5s.pt', conf=0.5, iou=0.4):
    """Load YOLOv5 model with error handling and custom configuration."""
    try:
        model = torch.hub.load('ultralytics/yolov5', 'custom', path=weights_path, force_reload=True)
        model.conf = conf  # Confidence threshold
        model.iou = iou  # NMS IoU threshold
        return model
    except Exception as e:
        print(f"Error loading the model: {e}")
        exit()

def process_frame(frame, model):
    """Run YOLOv5 inference on a single frame and return predictions."""
    frame_resized = cv2.resize(frame, FRAME_SIZE)
    results = model(frame_resized)
    pred = results.pred[0].cpu().numpy()  # Extract predictions
    return pred, frame_resized

def draw_boxes(frame, predictions, label, color):
    """Draw bounding boxes and labels on a frame."""
    for obj in predictions:
        x1, y1, x2, y2 = map(int, obj[:4])
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, label, (x1, y1 - 10), FONT, 0.5, color, 2, cv2.LINE_AA)

def check_safety_gear(workers, helmets, vests, frame):
    """Check workers for missing helmets or vests and return the frames with violations."""
    no_safety_frames = []
    violation_detected = False
    
    for worker in workers:
        x1, y1, x2, y2 = map(int, worker[:4])
        # Check if the worker has a helmet or vest
        has_helmet = any(x1 <= h[2] and x2 >= h[0] and y1 <= h[3] and y2 >= h[1] for h in helmets)
        has_vest = any(x1 <= v[2] and x2 >= v[0] and y1 <= v[3] and y2 >= v[1] for v in vests)

        # Determine what safety gear is missing
        if not has_helmet and not has_vest:
            # No Helmet and No Vest: Yellow
            draw_boxes(frame, [worker], "No Helmet and Vest", (0, 255, 255))  # Yellow box
            violation_detected = True
        elif not has_helmet:
            # No Helmet: Blue
            draw_boxes(frame, [worker], "No Helmet", (255, 0, 0))  # Blue box
            violation_detected = True
        elif not has_vest:
            # No Vest: Red
            draw_boxes(frame, [worker], "No Vest", (0, 0, 255))  # Red box
            violation_detected = True

        if violation_detected:
            no_safety_frames.append(frame.copy())  # Save the frame with the violation

    return no_safety_frames, violation_detected

def get_output_filename():
    """Generate output filename with current date and time."""
    current_time = datetime.now().strftime("%Y-%m-%d-%I-%M-%p")  # Format: YYYY-MM-DD-HH-MM-am/pm
    filename = f"Detection_{current_time}.mp4"
    return os.path.join(OUTPUT_DIR, filename)

def main():
    # Load two models: one for person detection (COCO), and one for safety gear detection
    coco_model = load_model('yolov5s.pt', conf=CONFIDENCE_THRESHOLD, iou=NMS_THRESHOLD)  # COCO model
    safety_model = load_model('best.pt', conf=CONFIDENCE_THRESHOLD, iou=NMS_THRESHOLD)  # Custom model for safety detection

    # Open video file for reading
    cap = cv2.VideoCapture(VIDEO_FILE)

    # Initialize video writer as None, only create it if we detect something
    out = None
    violations_detected = False  # Track if any violations are found

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Step 1: Detect people using the COCO-trained YOLOv5 model
        predictions, frame_resized = process_frame(frame, coco_model)
        persons = predictions[predictions[:, 5] == 0]  # Class '0' in COCO is 'person'

        if len(persons) > 0:
            # Step 2: If a person is detected, run safety gear detection
            safety_predictions, _ = process_frame(frame, safety_model)

            # Extract detections by class
            helmets = safety_predictions[safety_predictions[:, 5] == CLASSES.index('Helmet')]
            no_helmets = safety_predictions[safety_predictions[:, 5] == CLASSES.index('No Helmet')]
            vests = safety_predictions[safety_predictions[:, 5] == CLASSES.index('Safety Vest')]
            no_vests = safety_predictions[safety_predictions[:, 5] == CLASSES.index('No Safety Vest')]

            # Check and mark workers without safety gear, returning frames with violations
            no_safety_frames, violation_detected = check_safety_gear(persons, helmets, vests, frame_resized)

            if violation_detected:
                # Initialize video writer only when violations are detected
                if out is None:
                    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                    output_file = get_output_filename()
                    out = cv2.VideoWriter(output_file, fourcc, cap.get(cv2.CAP_PROP_FPS), FRAME_SIZE)
                
                # Write the frames with violations to the output video
                for no_safety_frame in no_safety_frames:
                    out.write(no_safety_frame)

                violations_detected = True

        # Optionally show the processed frame in a window
        cv2.imshow('Processed Frame', frame_resized)

        # Quit if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()

    # If a video writer was created and no violations were detected, delete the output file
    if out is not None:
        out.release()
        if not violations_detected:
            os.remove(output_file)
            print("No violations detected, video file deleted.")
        else:
            print(f"Video with violations saved at {output_file}")
    else:
        print("No violations detected, no video created.")
    
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


Downloading: "https://github.com/ultralytics/yolov5/zipball/master" to C:\Users\ENG-Mahmoud/.cache\torch\hub\master.zip
YOLOv5  2024-10-11 Python-3.9.13 torch-1.13.1 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients
Adding AutoShape... 
Downloading: "https://github.com/ultralytics/yolov5/zipball/master" to C:\Users\ENG-Mahmoud/.cache\torch\hub\master.zip
YOLOv5  2024-10-11 Python-3.9.13 torch-1.13.1 CPU

Fusing layers... 
YOLOv5m summary: 212 layers, 20869098 parameters, 0 gradients, 47.9 GFLOPs
Adding AutoShape... 


Video with violations saved at Detection Output\Detection_2024-10-11-11-13-PM.mp4
