In [1]:
import numpy as np
import tensorflow as tf
import os
import cv2
from ultralytics import YOLO
# Remove: from google.colab.patches import cv2_imshow
# If you want to display images in Jupyter, use:
from matplotlib import pyplot as plt

def cv2_imshow(img):
    """Display an image using matplotlib in Jupyter Notebook."""
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.imshow(img_rgb)
    plt.axis('off')
    plt.show()


In [2]:


def detect_motorcyclists_rcnn(image, sess, image_tensor, detection_boxes, detection_scores, detection_classes, num_detections, min_score_thresh=0.5):
    """Detect motorcyclists using RCNN model"""
    image_expanded = np.expand_dims(image, axis=0)
    
    # Perform detection
    (boxes, scores, classes, num) = sess.run(
        [detection_boxes, detection_scores, detection_classes, num_detections],
        feed_dict={image_tensor: image_expanded})
    
    # Process detections
    boxes = np.squeeze(boxes)
    scores = np.squeeze(scores)
    classes = np.squeeze(classes)
    
    # Filter by score threshold
    valid_detections = scores > min_score_thresh
    filtered_boxes = boxes[valid_detections]
    filtered_classes = classes[valid_detections]
    
    # Convert normalized coordinates to pixel coordinates
    im_height, im_width = image.shape[:2]
    final_boxes = []
    
    for box in filtered_boxes:
        ymin, xmin, ymax, xmax = box
        left = int(xmin * im_width)
        right = int(xmax * im_width)
        top = int(ymin * im_height)
        bottom = int(ymax * im_height)
        final_boxes.append([left, top, right, bottom])
    
    return final_boxes

def detect_helmets_yolov8(cropped_image, helmet_model, confidence_threshold=0.25):
    """Detect helmets in cropped motorcyclist image using YOLOv8"""
    results = helmet_model(cropped_image, conf=confidence_threshold, verbose=False)
    
    helmet_detections = []
    if len(results) > 0 and results[0].boxes is not None:
        for box in results[0].boxes:
            class_id = int(box.cls.item())
            confidence = box.conf.item()
            bbox = box.xyxy[0].cpu().numpy()  # [x1, y1, x2, y2]
            
            helmet_detections.append({
                'class_id': class_id,
                'confidence': confidence,
                'bbox': bbox,
                'class_name': 'With Helmet' if class_id == 0 else 'Without Helmet'
            })
    
    return helmet_detections

In [3]:
import numpy as np
import os
import cv2
from ultralytics import YOLO
from matplotlib import pyplot as plt

def cv2_imshow(img):
    """Display an image using matplotlib in Jupyter Notebook."""
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.imshow(img_rgb)
    plt.axis('off')
    plt.show()

def initialize_models():
    """Initialize YOLO models for motorcycle and helmet detection"""
    
    # Load YOLOv8 model for motorcycle detection (pretrained COCO model)
    motorcycle_model = YOLO('yolov8n.pt')  # This will auto-download if not available
    print("Motorcycle detection model loaded successfully!")
    
    # Load your trained YOLOv8 helmet detection model
    helmet_model_path = r"/Users/kushalgupta/Desktop/DeepLearning/yolov8n.pt"  # Update this path
    helmet_model = YOLO(helmet_model_path)
    print("Helmet detection model loaded successfully!")
    
    return motorcycle_model, helmet_model

def detect_motorcycles_yolov8(image, motorcycle_model, confidence_threshold=0.5):
    """Detect motorcycles using YOLOv8"""
    results = motorcycle_model(image, conf=confidence_threshold, verbose=False)
    
    motorcycle_boxes = []
    if len(results) > 0 and results[0].boxes is not None:
        for box in results[0].boxes:
            class_id = int(box.cls.item())
            confidence = box.conf.item()
            bbox = box.xyxy[0].cpu().numpy()  # [x1, y1, x2, y2]
            
            # Class 3 is 'motorcycle' in COCO dataset
            # Class 0 is 'person' (we might also want to detect riders)
            if class_id == 3:  # Motorcycle
                left, top, right, bottom = bbox.astype(int)
                motorcycle_boxes.append({
                    'bbox': [left, top, right, bottom],
                    'confidence': confidence
                })
    
    return motorcycle_boxes

def detect_helmets_yolov8(cropped_image, helmet_model, confidence_threshold=0.25):
    """Detect helmets in cropped motorcyclist image using YOLOv8"""
    results = helmet_model(cropped_image, conf=confidence_threshold, verbose=False)
    
    helmet_detections = []
    if len(results) > 0 and results[0].boxes is not None:
        for box in results[0].boxes:
            class_id = int(box.cls.item())
            confidence = box.conf.item()
            bbox = box.xyxy[0].cpu().numpy()  # [x1, y1, x2, y2]
            
            helmet_detections.append({
                'class_id': class_id,
                'confidence': confidence,
                'bbox': bbox,
                'class_name': 'With Helmet' if class_id == 0 else 'Without Helmet'
            })
    
    return helmet_detections

def expand_bbox_for_rider(bbox, image_shape, expansion_factor=0.3):
    """Expand motorcycle bbox to include rider"""
    left, top, right, bottom = bbox
    height = bottom - top
    width = right - left
    
    # Expand upwards to include rider
    expanded_top = max(0, top - int(height * expansion_factor))
    expanded_bottom = min(image_shape[0], bottom + int(height * 0.1))
    expanded_left = max(0, left - int(width * 0.1))
    expanded_right = min(image_shape[1], right + int(width * 0.1))
    
    return [expanded_left, expanded_top, expanded_right, expanded_bottom]

def process_video(input_video_path, output_video_path, motorcycle_model, helmet_model):
    """Process video for helmet detection using only YOLOv8"""
    
    # Open input video
    cap = cv2.VideoCapture(input_video_path)
    
    if not cap.isOpened():
        print(f"Error: Could not open video {input_video_path}")
        return
    
    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video Info: {width}x{height}, {fps} FPS, {total_frames} frames")
    
    # Create output directory if it doesn't exist
    os.makedirs(os.path.dirname(output_video_path), exist_ok=True)
    
    # Initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    frame_count = 0
    no_helmet_count = 0
    
    print("Starting video processing...")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        if frame_count % 30 == 0:
            print(f"Processing frame {frame_count}/{total_frames}")
        
        # Create a copy for drawing
        output_frame = frame.copy()
        
        # Step 1: Detect motorcycles using YOLOv8
        motorcycle_boxes = detect_motorcycles_yolov8(frame, motorcycle_model)
        
        # Step 2: For each motorcycle, detect helmets using YOLOv8
        for moto_box in motorcycle_boxes:
            bbox = moto_box['bbox']
            left, top, right, bottom = bbox
            
            # Draw motorcycle bounding box
            cv2.rectangle(output_frame, (left, top), (right, bottom), (255, 0, 0), 2)
            cv2.putText(output_frame, f"Motorcycle: {moto_box['confidence']:.2f}", 
                       (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
            
            # Expand bbox to include rider area
            rider_bbox = expand_bbox_for_rider(bbox, frame.shape)
            rider_left, rider_top, rider_right, rider_bottom = rider_bbox
            
            # Draw expanded rider area (optional, for debugging)
            cv2.rectangle(output_frame, (rider_left, rider_top), (rider_right, rider_bottom), 
                         (255, 165, 0), 1)  # Orange dashed line for rider area
            
            # Crop rider region
            cropped_rider = frame[rider_top:rider_bottom, rider_left:rider_right]
            
            if cropped_rider.size > 0:
                # Detect helmets in cropped region
                helmet_detections = detect_helmets_yolov8(cropped_rider, helmet_model)
                
                has_helmet = False
                has_no_helmet = False
                
                for detection in helmet_detections:
                    # Convert helmet bbox coordinates from cropped to original image
                    h_bbox = detection['bbox']
                    h_left = int(h_bbox[0]) + rider_left
                    h_top = int(h_bbox[1]) + rider_top
                    h_right = int(h_bbox[2]) + rider_left
                    h_bottom = int(h_bbox[3]) + rider_top
                    
                    # Choose color based on helmet detection
                    if detection['class_name'] == 'With Helmet':
                        color = (0, 255, 0)  # Green for helmet
                        has_helmet = True
                    else:
                        color = (0, 0, 255)  # Red for no helmet
                        has_no_helmet = True
                        no_helmet_count += 1
                    
                    # Draw helmet bounding box
                    cv2.rectangle(output_frame, (h_left, h_top), (h_right, h_bottom), color, 2)
                    
                    # Add label
                    label = f"{detection['class_name']}: {detection['confidence']:.2f}"
                    cv2.putText(output_frame, label, (h_left, h_top - 10),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                
                # Add motorcyclist status
                if has_no_helmet:
                    status = "NO HELMET - VIOLATION"
                    status_color = (0, 0, 255)
                elif has_helmet:
                    status = "HELMET DETECTED"
                    status_color = (0, 255, 0)
                else:
                    status = "NO HELMET DETECTED"
                    status_color = (255, 255, 0)
                
                cv2.putText(output_frame, status, (left, top - 40),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2)
        
        # Add frame counter and statistics
        cv2.putText(output_frame, f"Frame: {frame_count}/{total_frames}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        cv2.putText(output_frame, f"No Helmet Violations: {no_helmet_count}", (10, 60),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        
        # Write frame to output video
        out.write(output_frame)
    
    # Release resources
    cap.release()
    out.release()
    
    print(f"Video processing completed!")
    print(f"Total frames processed: {frame_count}")
    print(f"Total no-helmet violations detected: {no_helmet_count}")
    print(f"Output video saved to: {output_video_path}")

# Alternative: Direct helmet detection on full frame (simpler approach)
def process_video_simple(input_video_path, output_video_path, helmet_model):
    """Simplified version - directly detect helmets in full frame"""
    
    cap = cv2.VideoCapture(input_video_path)
    
    if not cap.isOpened():
        print(f"Error: Could not open video {input_video_path}")
        return
    
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video Info: {width}x{height}, {fps} FPS, {total_frames} frames")
    
    os.makedirs(os.path.dirname(output_video_path), exist_ok=True)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    frame_count = 0
    no_helmet_count = 0
    
    print("Starting simple video processing...")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        if frame_count % 30 == 0:
            print(f"Processing frame {frame_count}/{total_frames}")
        
        output_frame = frame.copy()
        
        # Direct helmet detection on full frame
        results = helmet_model(frame, conf=0.25, verbose=False)
        
        if len(results) > 0 and results[0].boxes is not None:
            for box in results[0].boxes:
                class_id = int(box.cls.item())
                confidence = box.conf.item()
                bbox = box.xyxy[0].cpu().numpy()
                
                left, top, right, bottom = bbox.astype(int)
                
                if class_id == 0:  # With Helmet
                    color = (0, 255, 0)
                    label = f"With Helmet: {confidence:.2f}"
                else:  # Without Helmet
                    color = (0, 0, 255)
                    label = f"Without Helmet: {confidence:.2f}"
                    no_helmet_count += 1
                
                cv2.rectangle(output_frame, (left, top), (right, bottom), color, 2)
                cv2.putText(output_frame, label, (left, top - 10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        
        # Add frame counter and statistics
        cv2.putText(output_frame, f"Frame: {frame_count}/{total_frames}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        cv2.putText(output_frame, f"No Helmet Violations: {no_helmet_count}", (10, 60),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        
        out.write(output_frame)
    
    cap.release()
    out.release()
    
    print(f"Simple video processing completed!")
    print(f"Total frames processed: {frame_count}")
    print(f"Total no-helmet violations detected: {no_helmet_count}")
    print(f"Output video saved to: {output_video_path}")

# Main execution
if __name__ == "__main__":
    # Choose which approach to use:
    
    # Approach 1: Two-step detection (Motorcycle â†’ Helmet)
    print("Loading models...")
    motorcycle_model, helmet_model = initialize_models()
    
    # Define input and output paths
    input_video_path = r"/Users/kushalgupta/Desktop/DeepLearning/datasets/input/input.mp4"
    output_video_path = r"/Users/kushalgupta/Desktop/DeepLearning/datasets/output/output4.mp4"
    
    # Option A: Use two-step detection
    process_video(input_video_path, output_video_path, motorcycle_model, helmet_model)
    
    # Option B: Use simple direct detection (uncomment below)
    # process_video_simple(input_video_path, output_video_path, helmet_model)
    
    print("Helmet detection on video completed successfully!")

Loading models...
Motorcycle detection model loaded successfully!
Helmet detection model loaded successfully!
Video Info: 1280x720, 59 FPS, 220 frames
Starting video processing...
Processing frame 30/220
Processing frame 60/220
Processing frame 90/220
Processing frame 120/220
Processing frame 150/220
Processing frame 180/220
Processing frame 210/220
Video processing completed!
Total frames processed: 220
Total no-helmet violations detected: 273
Output video saved to: /Users/kushalgupta/Desktop/DeepLearning/datasets/output/output4.mp4
Helmet detection on video completed successfully!
