# Video Accident Detection and Intensity Classification

This notebook processes videos to detect accidents using YOLO and classify their intensity using the trained ResNet18 model.

## 1. Install Dependencies

In [1]:
%%bash
pip install opencv-python onnxruntime torch torchvision pillow numpy



## 2. Import Libraries

In [2]:
import cv2
import torch
import numpy as np
from PIL import Image
from torchvision import transforms
import torch.nn as nn
from torchvision import models
import time
import onnxruntime as ort
import os

print("All libraries imported successfully!")
print(f"OpenCV version: {cv2.__version__}")
print(f"PyTorch version: {torch.__version__}")
print(f"ONNX Runtime version: {ort.__version__}")

All libraries imported successfully!
OpenCV version: 4.11.0
PyTorch version: 2.7.1
ONNX Runtime version: 1.22.0


## 3. Load Models

In [7]:
# Define the corrected model architecture to match the saved weights
class AccidentClassifier(nn.Module):
    def __init__(self):
        super(AccidentClassifier, self).__init__()
        # Load ResNet18 and modify it directly (without wrapping in self.resnet)
        resnet = models.resnet18(pretrained=False)
        
        # Copy all layers except the final fc layer
        self.conv1 = resnet.conv1
        self.bn1 = resnet.bn1
        self.relu = resnet.relu
        self.maxpool = resnet.maxpool
        self.layer1 = resnet.layer1
        self.layer2 = resnet.layer2
        self.layer3 = resnet.layer3
        self.layer4 = resnet.layer4
        self.avgpool = resnet.avgpool
        
        # Replace final layer for binary classification
        num_features = resnet.fc.in_features
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(num_features, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 2)  # 2 classes: weak (0), strong (1)
        )
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        
        return x

# Load intensity classification model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
intensity_model = AccidentClassifier()

# Load the trained weights
checkpoint = torch.load('accident_intensity_classifier.pth', map_location=device)
intensity_model.load_state_dict(checkpoint['model_state_dict'])
intensity_model.to(device)
intensity_model.eval()

print(f"Intensity classification model loaded successfully on {device}")

# Define transforms for intensity model
intensity_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

Intensity classification model loaded successfully on cpu


## 4. Video Processing Function

In [8]:
def process_video(input_video_path, output_video_path):
    """
    Process video to detect accidents using ONNX model and classify intensity
    
    Returns:
        frame_rate: Original video frame rate
        avg_processing_time: Average processing time per frame
    """
    
    # Open input video
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        raise ValueError(f"Could not open video: {input_video_path}")
    
    # Get video properties
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video properties:")
    print(f"  Frame rate: {frame_rate:.2f} FPS")
    print(f"  Resolution: {width}x{height}")
    print(f"  Total frames: {total_frames}")
    
    # Setup output video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, frame_rate, (width, height))
    
    processing_times = []
    frame_count = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        start_time = time.time()
        
        # Preprocess frame for ONNX model
        input_tensor = preprocess_image(frame)
        
        # Run ONNX inference
        outputs = onnx_session.run(None, {input_details.name: input_tensor})
        
        # Post-process detections
        detections = postprocess_detections(outputs, frame.shape, conf_threshold=0.5)
        
        # Process each detection
        for detection in detections:
            x1, y1, x2, y2 = detection['bbox']
            confidence = detection['confidence']
            
            # Extract region of interest
            roi = frame[y1:y2, x1:x2]
            
            if roi.size > 0:  # Check if ROI is valid
                # Convert to PIL Image and classify intensity
                roi_pil = Image.fromarray(cv2.cvtColor(roi, cv2.COLOR_BGR2RGB))
                roi_tensor = intensity_transform(roi_pil).unsqueeze(0).to(device)
                
                with torch.no_grad():
                    outputs_intensity = intensity_model(roi_tensor)
                    probabilities = torch.softmax(outputs_intensity, dim=1)
                    _, predicted = torch.max(outputs_intensity, 1)
                
                # Get prediction results
                class_names = ['Weak', 'Strong']
                predicted_class = class_names[predicted.item()]
                intensity_confidence = probabilities[0][predicted.item()].item()
                
                # Draw bounding box
                color = (0, 0, 255) if predicted_class == 'Strong' else (0, 255, 0)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                
                # Add text labels
                label = f"Accident-{predicted_class}: {intensity_confidence:.2f}"
                detection_label = f"Det: {confidence:.2f}"
                
                # Draw labels
                cv2.putText(frame, label, (x1, y1-10), 
                          cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                cv2.putText(frame, detection_label, (x1, y2+20), 
                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
        
        # Calculate processing time for this frame
        end_time = time.time()
        processing_times.append(end_time - start_time)
        
        # Write frame to output video
        out.write(frame)
        
        frame_count += 1
        if frame_count % 30 == 0:  # Progress update every 30 frames
            print(f"Processed {frame_count}/{total_frames} frames")
    
    # Clean up
    cap.release()
    out.release()
    
    # Calculate average processing time
    avg_processing_time = np.mean(processing_times)
    
    print(f"\nProcessing completed!")
    print(f"Output saved to: {output_video_path}")
    
    return frame_rate, avg_processing_time

## 5. Process Video

In [13]:
# Define input and output paths
input_video = "data/testing2.mp4"  # Replace with your video path
output_video = "output_video_with_predictions_2.mp4"

# Check if input video exists
if not os.path.exists(input_video):
    print(f"Input video not found: {input_video}")
    print("Please update the 'input_video' variable with the correct path.")
else:
    # Process the video
    print("Starting video processing...")
    frame_rate, avg_processing_time = process_video(input_video, output_video)
    
    # Display results
    print("\n" + "="*50)
    print("PROCESSING RESULTS")
    print("="*50)
    print(f"Original frame rate: {frame_rate:.2f} FPS")
    print(f"Average processing time per frame: {avg_processing_time:.4f} seconds")
    print(f"Processing speed: {1/avg_processing_time:.2f} FPS")
    print(f"Real-time capability: {'Yes' if 1/avg_processing_time >= frame_rate else 'No'}")
    print("="*50)

Starting video processing...
Video properties:
  Frame rate: 30.00 FPS
  Resolution: 1280x720
  Total frames: 518
Processed 30/518 frames
Processed 60/518 frames
Processed 90/518 frames
Processed 120/518 frames
Processed 150/518 frames
Processed 180/518 frames
Processed 210/518 frames
Processed 240/518 frames
Processed 270/518 frames
Processed 300/518 frames
Processed 330/518 frames
Processed 360/518 frames
Processed 390/518 frames
Processed 420/518 frames
Processed 450/518 frames
Processed 480/518 frames
Processed 510/518 frames

Processing completed!
Output saved to: output_video_with_predictions_2.mp4

PROCESSING RESULTS
Original frame rate: 30.00 FPS
Average processing time per frame: 0.1261 seconds
Processing speed: 7.93 FPS
Real-time capability: No


## 6. Alternative: Custom Post-processing (if needed)

If the default post-processing doesn't work well with your specific ONNX model, you can customize it:

In [None]:
def custom_postprocess_detections(outputs, image_shape, input_size=(640, 640), conf_threshold=0.5):
    """
    Custom post-processing function - adjust based on your ONNX model's output format
    
    Common ONNX model output formats:
    - YOLOv5/v8: [batch, num_detections, 85] where 85 = 4 (bbox) + 1 (conf) + 80 (classes)
    - YOLOv3/v4: [batch, 3, grid_h, grid_w, 85]
    
    Adjust this function based on your model's specific output format
    """
    
    # Example for YOLOv5 format: [1, num_detections, 85]
    if len(outputs[0].shape) == 3:
        predictions = outputs[0][0]  # Remove batch dimension: [num_detections, 85]
        
        detections = []
        for detection in predictions:
            # Extract bbox, confidence, and class scores
            x_center, y_center, width, height = detection[:4]
            confidence = detection[4]
            class_scores = detection[5:]
            
            # Skip low confidence detections
            if confidence < conf_threshold:
                continue
            
            # Get best class
            class_id = np.argmax(class_scores)
            class_score = class_scores[class_id]
            
            # Final confidence is obj_conf * class_conf
            final_confidence = confidence * class_score
            
            if final_confidence < conf_threshold:
                continue
            
            # Convert to corner coordinates and scale to image size
            height_img, width_img = image_shape[:2]
            scale_x = width_img / input_size[0]
            scale_y = height_img / input_size[1]
            
            x1 = int((x_center - width/2) * scale_x)
            y1 = int((y_center - height/2) * scale_y)
            x2 = int((x_center + width/2) * scale_x)
            y2 = int((y_center + height/2) * scale_y)
            
            # Ensure coordinates are within image bounds
            x1 = max(0, min(x1, width_img-1))
            y1 = max(0, min(y1, height_img-1))
            x2 = max(0, min(x2, width_img-1))
            y2 = max(0, min(y2, height_img-1))
            
            if x2 > x1 and y2 > y1:
                detections.append({
                    'bbox': [x1, y1, x2, y2],
                    'confidence': float(final_confidence),
                    'class_id': int(class_id)
                })
        
        return detections
    
    # If your model has different output format, implement it here
    else:
        print(f"Unsupported output format: {[out.shape for out in outputs]}")
        return []

# Test the custom post-processing by replacing the function call in process_video
# detections = custom_postprocess_detections(outputs, frame.shape, conf_threshold=0.5)

## 7. Display Sample Frame (Optional)

In [None]:
# Display a sample frame from the processed video
def show_sample_frame(video_path, frame_number=100):
    """Show a sample frame from the processed video"""
    cap = cv2.VideoCapture(video_path)
    
    # Go to specific frame
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
    ret, frame = cap.read()
    
    if ret:
        # Convert BGR to RGB for display
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        import matplotlib.pyplot as plt
        plt.figure(figsize=(12, 8))
        plt.imshow(frame_rgb)
        plt.title(f"Sample Frame {frame_number} with Predictions")
        plt.axis('off')
        plt.show()
    
    cap.release()

# Show sample frame if output video exists
if os.path.exists(output_video):
    show_sample_frame(output_video, 100)

## Usage Notes

**Required Files:**
- `best.onnx` - Your trained ONNX model for accident detection
- `accident_intensity_classifier.pth` - Your trained ResNet18 model
- Input video file

**Key Parameters to Adjust:**
- `conf_threshold=0.5` - ONNX model detection confidence threshold
- Post-processing function - May need adjustment based on your ONNX model's output format
- Color coding: Red for "Strong", Green for "Weak" accidents

**Output:**
- Processed video with bounding boxes and intensity predictions
- Frame rate of original video
- Average processing time per frame
- Real-time processing capability assessment

**Note:** The post-processing function assumes a standard YOLO format. If your ONNX model has a different output format, use the custom post-processing function in section 6.