# Section 1: Dependencies and Utilities
- Download and import required libraries 
- Helper functions for video/image processing

# Section 2: YOLO Implementation
- Download YOLOv3 weights and config
- Vehicle detection class and functions

# Section 3: MOSSE Tracker
- Complete MOSSE tracker implementation
- Track-compensated frame interpolation (TCFI)

# Section 4: Crash Estimation
- Speed and trajectory estimation
- Future position prediction
- Collision detection algorithms

# Section 5: ViF Descriptor & SVM
- Horn-Schunck optical flow
- ViF descriptor implementation
- SVM training and prediction
- (Optional commented training code)

# Section 6: Main Pipeline
- Video loading and preprocessing
- Frame-by-frame processing
- Result visualization
- Output video generation

## Section 1: Dependencies and Utilities


In [None]:
import os
import cv2
import numpy as np
import torch
from scipy import ndimage
from sklearn.svm import SVC
import urllib.request
import matplotlib.pyplot as plt
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:



def download_file(url, filename):
    """Download files with progress bar"""
    with tqdm(unit='B', unit_scale=True, miniters=1, desc=filename) as t:
        urllib.request.urlretrieve(
            url, 
            filename, 
            reporthook=lambda blocks, block_size, total_size: t.update(block_size)
        )

class VideoProcessor:
    def __init__(self, target_width=480, target_height=360):
        self.target_width = target_width
        self.target_height = target_height
    
    def read_video(self, video_path):
        """Read video and return resized frames"""
        frames = []
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video: {video_path}")
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            # Resize frame to target dimensions
            frame = cv2.resize(frame, (self.target_width, self.target_height))
            frames.append(frame)
        
        cap.release()
        return frames
    
    def write_video(self, output_path, frames, fps=30):
        """Write frames to video file"""
        if not frames:
            raise ValueError("No frames to write")
            
        height, width = frames[0].shape[:2]
        writer = cv2.VideoWriter(
            output_path,
            cv2.VideoWriter_fourcc(*'mp4v'),
            fps,
            (width, height)
        )
        
        for frame in frames:
            writer.write(frame)
        writer.release()

# Utility functions for image processing
def preprocess_frame(frame):
    """Preprocess frame for YOLO"""
    # Convert BGR to RGB
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # Normalize
    frame = frame.astype(np.float32) / 255.0
    # Add batch dimension
    frame = np.expand_dims(frame, axis=0)
    return frame

def draw_box(frame, box, color=(0, 255, 0), thickness=2, label=None):
    """Draw bounding box with optional label"""
    x1, y1, x2, y2 = map(int, box)
    cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness)
    if label:
        cv2.putText(frame, label, (x1, y1-10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, thickness)
    return frame

def calculate_iou(box1, box2):
    """Calculate Intersection over Union of two boxes"""
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union = area1 + area2 - intersection
    
    return intersection / union if union > 0 else 0

# Create necessary directories
required_dirs = ['weights', 'config', 'output']
for d in required_dirs:
    os.makedirs(d, exist_ok=True)

## Section 2: YOLO Implementation


In [3]:

class YOLODetector:
    def __init__(self):
        # Download YOLOv3 weights and config if not exists
        self.weights_path = 'weights/yolov3.weights'
        self.config_path = 'config/yolov3.cfg'
        self.coco_names_path = 'config/coco.names'
        self.download_yolo_files()
        
        # Load COCO class names
        with open(self.coco_names_path, 'r') as f:
            self.classes = f.read().strip().split('\n')
        
        # Initialize network
        self.net = cv2.dnn.readNetFromDarknet(self.config_path, self.weights_path)
        self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
        self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
        
        # Get output layer names - Fixed version
        layer_names = self.net.getLayerNames()
        unconnected_layers = self.net.getUnconnectedOutLayers()
        self.output_layers = []
        for i in unconnected_layers:
            if isinstance(i, (list, np.ndarray)):
                # Handle case where i is array-like
                self.output_layers.append(layer_names[int(i[0] - 1)])
            else:
                # Handle case where i is scalar
                self.output_layers.append(layer_names[int(i - 1)])
        
        # Vehicle classes we're interested in (car, truck, bus, motorcycle)
        self.vehicle_classes = [2, 3, 5, 7]
        
    def download_yolo_files(self):
        """Download YOLOv3 weights and config files if not present"""
        files = {
            'weights': {
                'path': self.weights_path,
                'url': 'https://pjreddie.com/media/files/yolov3.weights'
            },
            'config': {
                'path': self.config_path,
                'url': 'https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg'
            },
            'names': {
                'path': self.coco_names_path,
                'url': 'https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names'
            }
        }
        
        for file_info in files.values():
            if not os.path.exists(file_info['path']):
                print(f"Downloading {file_info['path']}...")
                download_file(file_info['url'], file_info['path'])

    def detect(self, frame, conf_threshold=0.5, nms_threshold=0.4):
        """
        Detect vehicles in frame
        Returns: list of [class_id, x1, x2, y1, y2, confidence]
        """
        height, width = frame.shape[:2]
        
        # Create blob from image
        blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), 
                                   swapRB=True, crop=False)
        self.net.setInput(blob)
        
        # Forward pass
        layer_outputs = self.net.forward(self.output_layers)
        
        # Initialize lists for detected boxes, confidences, and class IDs
        boxes = []
        confidences = []
        class_ids = []
        
        # Process each layer output
        for output in layer_outputs:
            for detection in output:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]
                
                # Filter for vehicle classes and confidence threshold
                if class_id in self.vehicle_classes and confidence > conf_threshold:
                    # Get box coordinates
                    center_x = int(detection[0] * width)
                    center_y = int(detection[1] * height)
                    w = int(detection[2] * width)
                    h = int(detection[3] * height)
                    
                    # Rectangle coordinates
                    x1 = int(center_x - w/2)
                    y1 = int(center_y - h/2)
                    x2 = x1 + w
                    y2 = y1 + h
                    
                    boxes.append([x1, y1, x2, y2])
                    confidences.append(float(confidence))
                    class_ids.append(class_id)
        
        # Apply non-maximum suppression
        indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold)
        
        detections = []
        if len(indices) > 0:
            indices = indices.flatten()
            for i in indices:
                detections.append([
                    class_ids[i],
                    boxes[i][0], boxes[i][2],  # x1, x2
                    boxes[i][1], boxes[i][3],  # y1, y2
                    confidences[i]
                ])
        
        return detections

    def draw_detections(self, frame, detections):
        """Draw detection boxes on frame"""
        frame_copy = frame.copy()
        for detection in detections:
            class_id, x1, x2, y1, y2, conf = detection
            label = f"{self.classes[class_id]}: {conf:.2f}"
            color = (0, 255, 0)  # Green for vehicles
            draw_box(frame_copy, [x1, y1, x2, y2], color=color, label=label)
        return frame_copy

## Section 3: MOSSE Tracker and TCFI Implementation


In [4]:

import numpy as np
from scipy import signal

class MOSSETracker:
    def __init__(self, num_of_training_imgs=10, learning_rate=0.225, psr_threshold=10.0):
        self.num_of_training_imgs = num_of_training_imgs
        self.learning_rate = learning_rate
        self.psr_threshold = psr_threshold
        
        # Initialize tracking variables
        self.dx = []  # x direction movement
        self.dy = []  # y direction movement
        self.centers = []  # tracking centers history
        self.updated_last_time = True
        self.good = True  # tracking status
        
    def init_tracker(self, frame, bbox):
        """Initialize tracker with first frame and bounding box"""
        x1, y1, x2, y2 = map(int, bbox)
        self.width = x2 - x1
        self.height = y2 - y1
        self.center = (x1 + self.width/2, y1 + self.height/2)
        self.area = self.width * self.height
        
        # Get initial window
        self.win = cv2.createHanningWindow((self.width, self.height), cv2.CV_32F)
        
        # Create Gaussian target
        g = np.zeros((self.height, self.width), np.float32)
        g[self.height//2, self.width//2] = 1
        g = cv2.GaussianBlur(g, (-1, -1), 3.0)
        g = g / g.max()
        
        # Convert to Fourier domain
        self.G = cv2.dft(g, flags=cv2.DFT_COMPLEX_OUTPUT)
        
        # Initialize tracker
        img = self.get_subwindow(frame, self.center)
        self.prepare_initialization(frame, img)
        
    def prepare_initialization(self, frame, img):
        """Prepare initial tracking filters"""
        self.H1 = np.zeros_like(self.G)
        self.H2 = np.zeros_like(self.G)
        
        # Apply gaussian blur
        img = cv2.GaussianBlur(img, (3, 3), 3)
        
        # Generate training examples
        for _ in range(self.num_of_training_imgs):
            transformed_img = self.random_warp(img)
            H1, H2 = self.compute_filter(transformed_img)
            self.H1 += H1
            self.H2 += H2
            
        self.update_filter()
        self.update_tracking(frame, False)
        
    def get_subwindow(self, frame, center):
        """Extract subwindow at center location"""
        x, y = map(int, center)
        x1 = max(0, x - self.width//2)
        y1 = max(0, y - self.height//2)
        x2 = min(frame.shape[1], x1 + self.width)
        y2 = min(frame.shape[0], y1 + self.height)
        
        roi = frame[y1:y2, x1:x2]
        if roi.shape[0] != self.height or roi.shape[1] != self.width:
            roi = cv2.resize(roi, (self.width, self.height))
        return roi.astype(np.float32)
        
    def preprocess(self, img):
        """Preprocess image for tracking"""
        img = np.log(np.float32(img) + 1.0)
        mean = img.mean()
        std = img.std()
        img = (img - mean) / (std + 1e-5)
        return img * self.win
        
    def compute_filter(self, img):
        """Compute numerator and denominator of MOSSE filter"""
        F = cv2.dft(self.preprocess(img), flags=cv2.DFT_COMPLEX_OUTPUT)
        H1 = cv2.mulSpectrums(self.G, F, 0, conjB=True)
        H2 = cv2.mulSpectrums(F, F, 0, conjB=True)
        return H1, H2
        
    def update_filter(self):
        """Update MOSSE filter"""
        self.H = self.compute_h_filter(self.H1, self.H2)
        self.H[..., 1] *= -1  # conjugate for correlation
        
    def compute_h_filter(self, H1, H2):
        """Compute H filter from H1 and H2"""
        Num_real, Num_imag = H1[..., 0], H1[..., 1]
        Den_real, Den_imag = H2[..., 0], H2[..., 1]
        
        h_filter = (Num_real + 1j * Num_imag) / (Den_real + 1j * Den_imag)
        return np.dstack([np.real(h_filter), np.imag(h_filter)]).copy()
        
    def correlate_and_update(self, img):
        """Correlate image with filter and update PSR"""
        F = cv2.dft(img, flags=cv2.DFT_COMPLEX_OUTPUT)
        C = cv2.mulSpectrums(F, self.H, 0, conjB=True)
        response = cv2.idft(C, flags=cv2.DFT_SCALE | cv2.DFT_REAL_OUTPUT)
        
        h, w = response.shape
        _, max_val, _, max_loc = cv2.minMaxLoc(response)
        
        # Calculate PSR (Peak to Sidelobe Ratio)
        side_resp = response.copy()
        side_resp[max(0, max_loc[1]-5):min(h, max_loc[1]+6),
                 max(0, max_loc[0]-5):min(w, max_loc[0]+6)] = 0
        mean = side_resp.mean()
        std = side_resp.std()
        psr = (max_val - mean) / (std + 1e-5)
        
        return psr, response, max_loc
        
    def update_tracking(self, frame, is_stopped=False):
        """Update tracking for current frame"""
        if is_stopped and self.updated_last_time:
            # Use TCFI - Track-compensated frame interpolation
            dx = sum(self.dx[-3:]) / 3
            dy = sum(self.dy[-3:]) / 3
            self.center = self.center[0] + dx, self.center[1] + dy
            self.dx.append(dx)
            self.dy.append(dy)
            self.updated_last_time = False
            return
            
        self.updated_last_time = True
        img = self.get_subwindow(frame, self.center)
        img = cv2.GaussianBlur(img, (3, 3), 3)
        img = self.preprocess(img)
        
        psr, response, (dx, dy) = self.correlate_and_update(img)
        self.good = psr > self.psr_threshold
        
        if not self.good:
            if not self.dx:
                self.dx.append(0)
                self.dy.append(0)
            else:
                self.dx.append(self.dx[-1])
                self.dy.append(self.dy[-1])
            self.center = (self.center[0] + self.dx[-1], 
                         self.center[1] + self.dy[-1])
        else:
            dx = dx - self.width/2
            dy = dy - self.height/2
            self.dx.append(dx)
            self.dy.append(dy)
            self.center = (self.center[0] + dx, self.center[1] + dy)
            
            # Update filter
            img = self.get_subwindow(frame, self.center)
            H1, H2 = self.compute_filter(img)
            self.H1 = self.H1 * (1.0 - self.learning_rate) + H1 * self.learning_rate
            self.H2 = self.H2 * (1.0 - self.learning_rate) + H2 * self.learning_rate
            self.update_filter()
            
        self.centers.append((int(self.center[0]), int(self.center[1])))
        return self.good
        
    def random_warp(self, img):
        """Apply random affine transform for training"""
        height, width = img.shape[:2]
        T = np.zeros((2, 3))
        
        perturb = 0.2
        angle = (np.random.rand() - 0.5) * perturb
        cos_val = np.cos(angle)
        sin_val = np.sin(angle)
        
        T[:2, :2] = [[cos_val, -sin_val], [sin_val, cos_val]]
        T[:2, :2] += (np.random.rand(2, 2) - 0.5) * perturb
        T[:, 2] = (width/2, height/2) - np.dot(T[:2, :2], (width/2, height/2))
        
        return cv2.warpAffine(img, T, (width, height), 
                            borderMode=cv2.BORDER_REFLECT)

class VehicleTracker:
    def __init__(self, frame_width, frame_height, tracker_id=0):
        self.frame_width = frame_width
        self.frame_height = frame_height
        self.tracker_id = tracker_id
        self.history = []
        self.tracker = None
        
    def init_track(self, frame, bbox):
        """Initialize tracking for a vehicle"""
        self.tracker = MOSSETracker()
        self.tracker.init_tracker(frame, bbox)
        self.history.append(bbox)
        
    def update(self, frame):
        """Update tracking"""
        if self.tracker is None:
            return None
            
        # Check if vehicle is stopped
        is_stopped = False
        if len(self.tracker.dx) >= 3:
            avg_speed = self.get_avg_speed(
                len(self.tracker.dx)-3, 
                len(self.tracker.dx)
            )
            is_stopped = avg_speed < 20
            
        # Update tracking
        self.tracker.update_tracking(frame, is_stopped)
        current_bbox = self.get_current_bbox()
        self.history.append(current_bbox)
        return current_bbox
        
    def get_current_bbox(self):
        """Get current bounding box"""
        x, y = self.tracker.center
        w, h = self.tracker.width, self.tracker.height
        return [
            int(x - w/2), int(y - h/2),
            int(x + w/2), int(y + h/2)
        ]
        
    def get_avg_speed(self, start_frame=None, end_frame=None):
        """Calculate average speed"""
        if not self.tracker.dx:
            return 0
            
        if start_frame is None or end_frame is None:
            dx_changes = self.tracker.dx
            dy_changes = self.tracker.dy
        else:
            dx_changes = self.tracker.dx[start_frame:end_frame]
            dy_changes = self.tracker.dy[start_frame:end_frame]
            
        avg_dx = sum(dx_changes) / len(dx_changes)
        avg_dy = sum(dy_changes) / len(dy_changes)
        speed = np.sqrt(avg_dx**2 + avg_dy**2)
        return speed * self.get_speed_coefficient()
        
    def get_speed_coefficient(self):
        """Calculate speed coefficient based on vehicle area"""
        coefficient = 43200 / self.tracker.area
        return coefficient

## Section 4: Crash Estimation


In [5]:

class CrashEstimator:
    def __init__(self, speed_limit=50, collision_threshold=40):
        self.speed_limit = speed_limit  # Minimum speed to consider for crash estimation
        self.collision_threshold = collision_threshold  # Distance threshold for collision detection
        self.estimation_frames = [16, 19, 22, 25, 28]  # Frames to check for collision
        
    def estimate_future_position(self, tracker, frame_no):
        """Estimate vehicle position after 10 frames"""
        if len(tracker.tracker.dx) < 5 or len(tracker.tracker.dx) > 20:
            # Not enough history or too old tracker
            return None, None, None, None
        
        # Calculate average movement from last 10 frames
        measure = min(len(tracker.tracker.dx), 10)
        expected_position_no = len(tracker.tracker.dx) + 10
        
        current_x, current_y = tracker.tracker.center
        dx = sum(tracker.tracker.dx[-measure:]) / measure
        dy = sum(tracker.tracker.dy[-measure:]) / measure
        
        # Predict future position
        future_x = current_x + dx * measure
        future_y = current_y + dy * measure
        
        # Get bounding box for future position
        w, h = tracker.tracker.width, tracker.tracker.height
        return (
            int(future_x - w/2),  # x1
            int(future_y - h/2),  # y1
            int(future_x + w/2),  # x2
            int(future_y + h/2)   # y2
        )
    
    def check_speed(self, tracker, frame_no):
        """Check if vehicle speed is above threshold"""
        if frame_no < 10:
            return False
            
        current_speed = tracker.get_avg_speed(frame_no-10, frame_no)
        return current_speed > self.speed_limit
    
    def calculate_distance(self, pos1, pos2):
        """Calculate distance between centers of two positions"""
        if not pos1 or not pos2:
            return float('inf')
            
        center1 = ((pos1[0] + pos1[2])/2, (pos1[1] + pos1[3])/2)
        center2 = ((pos2[0] + pos2[2])/2, (pos2[1] + pos2[3])/2)
        
        return np.sqrt(
            (center1[0] - center2[0])**2 + 
            (center1[1] - center2[1])**2
        )
    
    def calculate_trajectory_deviation(self, tracker, frame_no, future_pos):
        """Calculate deviation between predicted and actual trajectory"""
        if not future_pos:
            return float('inf')
            
        current_x, current_y = tracker.tracker.center
        future_center = ((future_pos[0] + future_pos[2])/2,
                        (future_pos[1] + future_pos[3])/2)
        
        actual_x, actual_y = tracker.tracker.centers[frame_no]
        
        deviation = np.sqrt(
            (actual_x - future_center[0])**2 + 
            (actual_y - future_center[1])**2
        )
        return deviation
    
    def detect_collision(self, tracker_a, tracker_b, frame_no):
        """Detect potential collision between two vehicles"""
        # Check if either vehicle is moving fast enough
        if not (self.check_speed(tracker_a, frame_no) or 
                self.check_speed(tracker_b, frame_no)):
            return False
        
        # Get future positions
        future_pos_a = self.estimate_future_position(tracker_a, frame_no)
        future_pos_b = self.estimate_future_position(tracker_b, frame_no)
        
        # Calculate distance between future positions
        future_distance = self.calculate_distance(future_pos_a, future_pos_b)
        
        # If vehicles are too far apart, no collision
        if future_distance > self.collision_threshold:
            return False
        
        # Calculate trajectory deviations
        dev_a = self.calculate_trajectory_deviation(tracker_a, frame_no, future_pos_a)
        dev_b = self.calculate_trajectory_deviation(tracker_b, frame_no, future_pos_b)
        
        # If maximum deviation is significant compared to distance
        max_deviation = max(dev_a, dev_b)
        if 2 * max_deviation > future_distance:
            return True
            
        return False
    
    def estimate_crashes(self, trackers, frame_no):
        """Estimate crashes between all tracked vehicles"""
        crash_pairs = []
        
        # Check each pair of trackers
        for i, tracker_a in enumerate(trackers):
            for j, tracker_b in enumerate(trackers[i+1:], i+1):
                # Only check at specific frames
                if frame_no not in self.estimation_frames:
                    continue
                    
                if self.detect_collision(tracker_a, tracker_b, frame_no):
                    crash_pairs.append((i, j))
        
        return crash_pairs

    def annotate_crash_estimation(self, frame, trackers, crash_pairs):
        """Draw crash estimation visualizations on frame"""
        frame_copy = frame.copy()
        
        # Draw current positions and trajectories
        for tracker in trackers:
            if len(tracker.tracker.centers) < 2:
                continue
                
            # Draw trajectory
            centers = tracker.tracker.centers[-10:]
            for i in range(len(centers)-1):
                pt1 = tuple(map(int, centers[i]))
                pt2 = tuple(map(int, centers[i+1]))
                cv2.line(frame_copy, pt1, pt2, (0, 255, 0), 1)
            
            # Draw current position
            bbox = tracker.get_current_bbox()
            if tracker.tracker.good:
                color = (0, 255, 0)  # Green for good tracking
            else:
                color = (0, 0, 255)  # Red for poor tracking
            draw_box(frame_copy, bbox, color)
        
        # Highlight crash predictions
        for i, j in crash_pairs:
            tracker_a = trackers[i]
            tracker_b = trackers[j]
            
            # Draw connecting line between vehicles
            center_a = tracker_a.tracker.center
            center_b = tracker_b.tracker.center
            cv2.line(frame_copy, 
                    tuple(map(int, center_a)),
                    tuple(map(int, center_b)),
                    (0, 0, 255), 2)
            
            # Draw warning text
            text_pos = (
                int((center_a[0] + center_b[0])/2),
                int((center_a[1] + center_b[1])/2) - 10
            )
            cv2.putText(frame_copy, "CRASH RISK!",
                       text_pos,
                       cv2.FONT_HERSHEY_SIMPLEX,
                       0.5, (0, 0, 255), 2)
        
        return frame_copy

## Section 5: ViF (Violent Flow) Descriptor & SVM


In [6]:

class ViolentFlowDescriptor:
    def __init__(self, subsample=3, n_bins=20):
        self.subsample = subsample  # Frame subsampling rate
        self.n_bins = n_bins  # Number of histogram bins
        self.cell_size = (4, 4)  # Size of grid cells
        self.hs = HornSchunck()  # Horn-Schunck optical flow
        
    def compute_flow_magnitude(self, prev_frame, curr_frame):
        """Compute optical flow magnitude using Horn-Schunck"""
        # Calculate optical flow
        u, v, _ = self.hs.process(prev_frame, curr_frame)
        # Calculate magnitude
        magnitude = np.sqrt(u*u + v*v)
        return magnitude
    
    def create_histogram(self, flow_map):
        """Create histogram of flow magnitudes"""
        # Normalize flow map to [0,1]
        if flow_map.max() != 0:
            flow_map = flow_map / flow_map.max()
            
        hist, _ = np.histogram(
            flow_map, 
            bins=self.n_bins, 
            range=(0, 1), 
            density=True
        )
        return hist
    
    def compute_binary_map(self, magnitude_changes):
        """Compute binary significance map"""
        # Calculate adaptive threshold
        threshold = np.mean(np.abs(magnitude_changes))
        return (np.abs(magnitude_changes) >= threshold).astype(np.float32)
    
    def extract_features(self, frames):
        """Extract ViF features from sequence of frames"""
        if len(frames) < 30:
            raise ValueError("Need at least 30 frames")
            
        height, width = frames[0].shape[:2]
        n_cells_y = height // self.cell_size[0]
        n_cells_x = width // self.cell_size[1]
        
        # Initialize feature vector
        feature_vector = np.zeros((n_cells_y * n_cells_x * self.n_bins,))
        
        # Process frames with subsampling
        flow_maps = []
        for i in range(0, len(frames)-self.subsample, self.subsample):
            prev_frame = cv2.cvtColor(frames[i], cv2.COLOR_BGR2GRAY)
            curr_frame = cv2.cvtColor(frames[i+self.subsample], 
                                    cv2.COLOR_BGR2GRAY)
            
            magnitude = self.compute_flow_magnitude(prev_frame, curr_frame)
            flow_maps.append(magnitude)
            
        # Compute magnitude changes
        magnitude_changes = np.zeros_like(flow_maps[0])
        for i in range(len(flow_maps)-1):
            magnitude_changes += np.abs(flow_maps[i+1] - flow_maps[i])
        magnitude_changes /= len(flow_maps)-1
        
        # Compute binary map
        binary_map = self.compute_binary_map(magnitude_changes)
        
        # Compute histograms for each cell
        idx = 0
        for y in range(n_cells_y):
            for x in range(n_cells_x):
                # Extract cell
                cell = binary_map[
                    y*self.cell_size[0]:(y+1)*self.cell_size[0],
                    x*self.cell_size[1]:(x+1)*self.cell_size[1]
                ]
                # Compute histogram for cell
                hist = self.create_histogram(cell)
                # Add to feature vector
                feature_vector[idx:idx+self.n_bins] = hist
                idx += self.n_bins
                
        return feature_vector

class HornSchunck:
    """Horn-Schunck optical flow implementation"""
    def __init__(self, alpha=0.001, iterations=8):
        self.alpha = alpha
        self.iterations = iterations
        
    def process(self, frame1, frame2):
        """Compute optical flow using Horn-Schunck method"""
        # Convert to float32
        frame1 = frame1.astype(np.float32) / 255.0
        frame2 = frame2.astype(np.float32) / 255.0
        
        # Compute gradients
        kernel_x = np.array([[-1, 1], [-1, 1]]) * 0.25
        kernel_y = np.array([[-1, -1], [1, 1]]) * 0.25
        kernel_t = np.ones((2, 2)) * 0.25
        
        fx = signal.convolve2d(frame1, kernel_x, 'same') + \
             signal.convolve2d(frame2, kernel_x, 'same')
        fy = signal.convolve2d(frame1, kernel_y, 'same') + \
             signal.convolve2d(frame2, kernel_y, 'same')
        ft = signal.convolve2d(frame1, -kernel_t, 'same') + \
             signal.convolve2d(frame2, kernel_t, 'same')
        
        # Initialize velocity
        u = np.zeros_like(frame1)
        v = np.zeros_like(frame1)
        
        # Iterate
        for _ in range(self.iterations):
            # Compute local averages
            u_avg = signal.convolve2d(u, np.ones((3,3))/9, 'same')
            v_avg = signal.convolve2d(v, np.ones((3,3))/9, 'same')
            
            # Update velocities
            den = fx*fx + fy*fy + self.alpha
            num_u = fx*u_avg + fy*v_avg + ft
            num_v = fx*u_avg + fy*v_avg + ft
            
            u = u_avg - fx * num_u / den
            v = v_avg - fy * num_v / den
            
        return u, v, np.sqrt(u*u + v*v)

class CrashClassifier:
    def __init__(self):
        self.vif = ViolentFlowDescriptor()
        self.svm = SVC(kernel='poly', probability=True)
        
    def train(self, crash_sequences, non_crash_sequences):
        """Train SVM classifier with ViF features"""
        # Extract features
        X = []
        y = []
        
        # Process crash sequences
        for seq in crash_sequences:
            features = self.vif.extract_features(seq)
            X.append(features)
            y.append(1)
            
        # Process non-crash sequences
        for seq in non_crash_sequences:
            features = self.vif.extract_features(seq)
            X.append(features)
            y.append(0)
            
        # Train SVM
        X = np.array(X)
        y = np.array(y)
        self.svm.fit(X, y)
        
    def predict(self, sequence):
        """Predict crash probability for sequence"""
        features = self.vif.extract_features(sequence)
        prob = self.svm.predict_proba([features])[0][1]
        return prob
    
    def save_model(self, path="models/crash_classifier.pkl"):
        """Save trained model"""
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, 'wb') as f:
            pickle.dump(self.svm, f)
            
    def load_model(self, path="models/crash_classifier.pkl"):
        """Load trained model"""
        with open(path, 'rb') as f:
            self.svm = pickle.load(f)

def extract_vehicle_sequence(frames, tracker, length=30):
    """Extract sequence of frames for a vehicle"""
    sequence = []
    for frame_idx, bbox in enumerate(tracker.history[-length:]):
        if frame_idx >= len(frames):
            break
            
        frame = frames[frame_idx].copy()
        x1, y1, x2, y2 = map(int, bbox)
        roi = frame[y1:y2, x1:x2]
        
        if roi.size == 0:
            continue
            
        # Resize to standard size
        roi = cv2.resize(roi, (64, 64))
        sequence.append(roi)
        
    return sequence if len(sequence) == length else None

## Section 6: Main Pipeline and Visualization


In [7]:
# Section 6: Main Pipeline and Visualization

class AccidentDetectionPipeline:
    def __init__(self):
        # Initialize all components
        self.video_processor = VideoProcessor()
        self.detector = YOLODetector()
        self.crash_estimator = CrashEstimator()
        self.crash_classifier = CrashClassifier()
        
        # Load pre-trained crash classifier if available
        try:
            self.crash_classifier.load_model()
        except:
            print("No pre-trained crash classifier found. Only using crash estimation.")
        
        self.trackers = []
        self.tracker_ids = 0
        
    def process_video(self, video_path, output_path=None):
        """Process video for accident detection"""
        print("Loading video...")
        frames = self.video_processor.read_video(video_path)
        if not frames:
            raise ValueError("Could not load video")
            
        output_frames = []
        self.trackers = []
        self.tracker_ids = 0
        
        print(f"Processing {len(frames)} frames...")
        for frame_idx, frame in enumerate(tqdm(frames)):
            processed_frame = self.process_frame(frame, frame_idx)
            output_frames.append(processed_frame)
            
        if output_path:
            print("Saving output video...")
            self.video_processor.write_video(output_path, output_frames)
            
        return output_frames
    
    def process_frame(self, frame, frame_idx):
        """Process single frame"""
        frame_annotated = frame.copy()
        
        # Vehicle Detection (every 30 frames)
        if frame_idx % 30 == 0:
            detections = self.detector.detect(frame)
            self.initialize_new_trackers(frame, detections)
            
        # Update trackers
        valid_trackers = []
        for tracker in self.trackers:
            current_bbox = tracker.update(frame)
            if current_bbox:
                valid_trackers.append(tracker)
                
        self.trackers = valid_trackers
        
        # Crash Estimation
        crash_pairs = self.crash_estimator.estimate_crashes(
            self.trackers, frame_idx)
        
        # Verify crashes with ViF if classifier is available
        verified_crashes = []
        for i, j in crash_pairs:
            is_verified = True
            if hasattr(self, 'crash_classifier'):
                # Get sequences for both vehicles
                seq_i = extract_vehicle_sequence(
                    frames[-30:], self.trackers[i])
                seq_j = extract_vehicle_sequence(
                    frames[-30:], self.trackers[j])
                
                if seq_i is not None and seq_j is not None:
                    is_verified = (
                        self.crash_classifier.predict(seq_i) > 0.5 or 
                        self.crash_classifier.predict(seq_j) > 0.5
                    )
            
            if is_verified:
                verified_crashes.append((i, j))
        
        # Annotate frame
        frame_annotated = self.annotate_frame(
            frame_annotated, 
            frame_idx,
            verified_crashes
        )
        
        return frame_annotated
    
    def initialize_new_trackers(self, frame, detections):
        """Initialize trackers for new detections"""
        if not detections:
            return
            
        # Initialize new tracker for each detection
        for detection in detections:
            tracker = VehicleTracker(
                self.video_processor.target_width,
                self.video_processor.target_height,
                self.tracker_ids
            )
            self.tracker_ids += 1
            
            bbox = detection[1:5]  # x1, x2, y1, y2
            tracker.init_track(frame, bbox)
            self.trackers.append(tracker)
            
    def annotate_frame(self, frame, frame_idx, crash_pairs):
        """Annotate frame with detections, tracks, and crashes"""
        # Draw tracker paths and boxes
        for tracker in self.trackers:
            # Draw trajectory
            if len(tracker.history) > 1:
                for i in range(len(tracker.history)-1):
                    pt1 = (
                        int((tracker.history[i][0] + tracker.history[i][2])/2),
                        int((tracker.history[i][1] + tracker.history[i][3])/2)
                    )
                    pt2 = (
                        int((tracker.history[i+1][0] + tracker.history[i+1][2])/2),
                        int((tracker.history[i+1][1] + tracker.history[i+1][3])/2)
                    )
                    cv2.line(frame, pt1, pt2, (0, 255, 0), 1)
            
            # Draw current box
            bbox = tracker.history[-1]
            speed = tracker.get_avg_speed()
            label = f"ID: {tracker.tracker_id} Speed: {speed:.1f}"
            color = (0, 255, 0) if speed < self.crash_estimator.speed_limit else (0, 165, 255)
            draw_box(frame, bbox, color=color, label=label)
        
        # Highlight crashes
        for i, j in crash_pairs:
            tracker_a = self.trackers[i]
            tracker_b = self.trackers[j]
            
            # Draw warning box around crash area
            bbox_a = tracker_a.history[-1]
            bbox_b = tracker_b.history[-1]
            
            x1 = min(bbox_a[0], bbox_b[0])
            y1 = min(bbox_a[1], bbox_b[1])
            x2 = max(bbox_a[2], bbox_b[2])
            y2 = max(bbox_a[3], bbox_b[3])
            
            # Expand warning box
            padding = 20
            x1 = max(0, x1 - padding)
            y1 = max(0, y1 - padding)
            x2 = min(frame.shape[1], x2 + padding)
            y2 = min(frame.shape[0], y2 + padding)
            
            # Draw warning box
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
            
            # Add warning text
            warning_text = "CRASH DETECTED!"
            text_size = cv2.getTextSize(
                warning_text, 
                cv2.FONT_HERSHEY_SIMPLEX, 
                1, 
                2
            )[0]
            
            text_x = int((x1 + x2 - text_size[0])/2)
            text_y = int(y1 - 10)
            
            cv2.putText(
                frame,
                warning_text,
                (text_x, text_y),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (0, 0, 255),
                2
            )
        
        # Add frame information
        cv2.putText(
            frame,
            f"Frame: {frame_idx}",
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            1,
            (255, 255, 255),
            2
        )
        
        return frame

def visualize_detection(frame, detections):
    """Visualize detections on a single frame"""
    frame_viz = frame.copy()
    for detection in detections:
        bbox = detection[1:5]
        conf = detection[5]
        draw_box(frame_viz, bbox, label=f"Conf: {conf:.2f}")
    return frame_viz

def main():
    # Initialize pipeline
    pipeline = AccidentDetectionPipeline()
    
    # Process video
    input_video = "C:\\Users\\HP\\OneDrive\\Documents\\mini projects\\testing\\Argus\\videos\\1508.mp4"
    output_video = "output_video.mp4"
    
    try:
        output_frames = pipeline.process_video(input_video, output_video)
        print(f"Processing complete. Output saved to {output_video}")
        
        # Display sample frames
        sample_frames = [
            output_frames[i] for i in range(0, len(output_frames), 30)
        ]
        
        plt.figure(figsize=(15, 10))
        for i, frame in enumerate(sample_frames[:6]):
            plt.subplot(2, 3, i+1)
            plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            plt.axis('off')
            plt.title(f"Frame {i*30}")
        plt.tight_layout()
        plt.show()
        
    except Exception as e:
        print(f"Error processing video: {str(e)}")

if __name__ == "__main__":
    main()

No pre-trained crash classifier found. Only using crash estimation.
Loading video...
Processing 304 frames...


  0%|          | 0/304 [00:00<?, ?it/s]

Error processing video: OpenCV(4.5.3) C:\Users\runneradmin\AppData\Local\Temp\pip-req-build-q3d_8t8e\opencv\modules\imgproc\src\phasecorr.cpp:606: error: (-215:Assertion failed) winSize.width > 1 && winSize.height > 1 in function 'cv::createHanningWindow'




