In [6]:
import cv2
import numpy as np
from scipy.optimize import linear_sum_assignment
from collections import deque

def non_max_suppression(boxes, scores, threshold):
    """Apply Non-Maximum Suppression."""
    if len(boxes) == 0:
        return []
    
    boxes = np.array(boxes, dtype=np.float32)
    pick = []
    
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 0] + boxes[:, 2]
    y2 = boxes[:, 1] + boxes[:, 3]
    
    area = (x2 - x1 + 1) * (y2 - y1 + 1)
    idxs = np.argsort(scores)
    
    while len(idxs) > 0:
        last = len(idxs) - 1
        i = idxs[last]
        pick.append(i)
        
        xx1 = np.maximum(x1[i], x1[idxs[:last]])
        yy1 = np.maximum(y1[i], y1[idxs[:last]])
        xx2 = np.minimum(x2[i], x2[idxs[:last]])
        yy2 = np.minimum(y2[i], y2[idxs[:last]])
        
        w = np.maximum(0, xx2 - xx1 + 1)
        h = np.maximum(0, yy2 - yy1 + 1)
        
        overlap = (w * h) / area[idxs[:last]]
        
        idxs = np.delete(idxs, np.concatenate(([last],
            np.where(overlap > threshold)[0])))
    
    return pick

class VideoProcessor:
    def __init__(self):
        self.bg_subtractor = cv2.createBackgroundSubtractorMOG2()
        self.bg_subtractor.setShadowThreshold(0.5)
        self.kernel = np.ones((9, 9), dtype=np.uint8)

        self.tracked_objects = {}
        self.next_object_id = 0
        self.max_missed_frames = 10
        self.min_appearance_frames = 3

        self.min_contour_area = 750

        self.temporal_buffer = deque(maxlen=5)

        self.track_colors = {}
        self.max_trace_length = 50

        # Road Mask Parameters
        self.lower_bound = np.array([100, 0, 0])
        self.upper_bound = np.array([180, 255, 255])
        self.min_area = 10000
        self.dilate_kernel = np.ones((5, 5), np.uint8)
        self.road_mask = None
        
        # Create road mask controls
        cv2.namedWindow('Road Mask Controls')
        cv2.createTrackbar('Hue Min', 'Road Mask Controls', 103, 180, self.nothing)
        cv2.createTrackbar('Hue Max', 'Road Mask Controls', 180, 180, self.nothing)
        cv2.createTrackbar('Sat Min', 'Road Mask Controls', 0, 255, self.nothing)
        cv2.createTrackbar('Sat Max', 'Road Mask Controls', 255, 255, self.nothing)
        cv2.createTrackbar('Val Min', 'Road Mask Controls', 0, 255, self.nothing)
        cv2.createTrackbar('Val Max', 'Road Mask Controls', 255, 255, self.nothing)
        cv2.createTrackbar('Min Area', 'Road Mask Controls', 10000, 20000, self.nothing)
        cv2.createTrackbar('Dilate', 'Road Mask Controls', 5, 20, self.nothing)

    def nothing(self, x):
        pass

    def get_road_mask(self, frame):
        """
        Generate the road mask based on HSV thresholds and morphology operations.
        """
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        mask = cv2.inRange(hsv, self.lower_bound, self.upper_bound)
        
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8))
        mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8))
        
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        filtered_mask = np.zeros_like(mask)
        
        for contour in contours:
            if cv2.contourArea(contour) > self.min_area:
                cv2.drawContours(filtered_mask, [contour], -1, 255, -1)
        
        filtered_mask = cv2.dilate(filtered_mask, self.dilate_kernel, iterations=1)
        return filtered_mask

    def update_mask_parameters(self):
        """
        Update the road mask parameters from the trackbars.
        """
        h_min = cv2.getTrackbarPos('Hue Min', 'Road Mask Controls')
        h_max = cv2.getTrackbarPos('Hue Max', 'Road Mask Controls')
        s_min = cv2.getTrackbarPos('Sat Min', 'Road Mask Controls')
        s_max = cv2.getTrackbarPos('Sat Max', 'Road Mask Controls')
        v_min = cv2.getTrackbarPos('Val Min', 'Road Mask Controls')
        v_max = cv2.getTrackbarPos('Val Max', 'Road Mask Controls')
        
        # Check for lower bound being higher than upper bound
        if h_min > h_max:
            h_min, h_max = h_max, h_min
            
        if s_min > s_max:
            s_min, s_max = s_max, s_min
        
        if v_min > v_max:
            v_min, v_max = v_max, v_min
        
        self.lower_bound = np.array([h_min, s_min, v_min])
        self.upper_bound = np.array([h_max, s_max, v_max])
        
        self.min_area = cv2.getTrackbarPos('Min Area', 'Road Mask Controls')
        dilate_size = cv2.getTrackbarPos('Dilate', 'Road Mask Controls')
        self.dilate_kernel = np.ones((dilate_size, dilate_size), np.uint8)

    def create_kalman_filter(self):
        kf = cv2.KalmanFilter(4, 2)
        kf.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
        kf.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
        kf.processNoiseCov = np.eye(4, dtype=np.float32) * 0.01
        return kf

    def initialize_kalman_for_object(self, center):
        kf = self.create_kalman_filter()
        kf.statePre = np.array([center[0, 0], center[1, 0], 0, 0], dtype=np.float32)
        kf.correct(center)
        self.tracked_objects[self.next_object_id] = {
            'kf': kf,
            'id': self.next_object_id,
            'missed_frames': 0,
            'appearance_count': 0,
            'path': deque(maxlen=self.max_trace_length)
        }
        self.track_colors[self.next_object_id] = tuple(np.random.randint(0, 255, 3).tolist())
        self.next_object_id += 1

    def update_tracked_objects(self, detected_centers, boxes, scores):
        unmatched_detections = detected_centers[:]
        tracked_ids = list(self.tracked_objects.keys())
        predicted_positions = [self.tracked_objects[obj_id]['kf'].predict()[:2].flatten() for obj_id in tracked_ids]

        # Initialize cost matrix only if there are tracked objects and detections
        if len(tracked_ids) > 0 and len(detected_centers) > 0:
            cost_matrix = np.zeros((len(tracked_ids), len(detected_centers)), dtype=np.float32)
            for i, predicted in enumerate(predicted_positions):
                for j, detected in enumerate(detected_centers):
                    cost_matrix[i, j] = np.linalg.norm(predicted - detected.flatten())

            # Solve assignment problem
            row_ind, col_ind = linear_sum_assignment(cost_matrix)

            matched_detections = set()
            for row, col in zip(row_ind, col_ind):
                if cost_matrix[row, col] < 50:  # Threshold for matching
                    obj_id = tracked_ids[row]
                    self.tracked_objects[obj_id]['kf'].correct(detected_centers[col])
                    self.tracked_objects[obj_id]['missed_frames'] = 0
                    self.tracked_objects[obj_id]['appearance_count'] += 1
                    self.tracked_objects[obj_id]['path'].append(detected_centers[col].flatten())
                    matched_detections.add(col)
                else:
                    self.tracked_objects[tracked_ids[row]]['missed_frames'] += 1

            unmatched_detections = [detected_centers[i] for i in range(len(detected_centers)) if i not in matched_detections]
        else:
            # No matches if no tracked objects or no detections
            row_ind, col_ind = [], []

        # Increment missed frames for unmatched tracked objects
        for obj_id in tracked_ids:
            if obj_id not in [tracked_ids[row] for row in row_ind]:
                self.tracked_objects[obj_id]['missed_frames'] += 1

            if self.tracked_objects[obj_id]['missed_frames'] > self.max_missed_frames:
                del self.tracked_objects[obj_id]
                del self.track_colors[obj_id]

        # Add new objects for unmatched detections
        for center, box, score in zip(unmatched_detections, boxes, scores):
            self.initialize_kalman_for_object(center)


    


    def process_frame(self, frame):
        """
        Process the frame, applying the road mask and performing object tracking.
        """
        self.update_mask_parameters()
        road_mask = self.get_road_mask(frame)
        
        fg_mask = self.bg_subtractor.apply(frame)
        fg_mask = cv2.bitwise_and(fg_mask, fg_mask, mask=road_mask)
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8))
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8))

        fg_mask = cv2.dilate(fg_mask, np.ones((12, 12), np.uint8), iterations=1)
        # Make all gray areas white
        fg_mask[fg_mask > 1] = 255
        


        contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        boxes = []
        scores = []
        detected_centers = []
        
        for contour in contours:
            if cv2.contourArea(contour) > self.min_contour_area:
                x, y, w, h = cv2.boundingRect(contour)
                aspect_ratio = w / h
                if 0.4 < aspect_ratio < 2.5:
                    confidence = min(cv2.contourArea(contour) / 2000.0, 1.0)
                    boxes.append([x, y, w, h])
                    scores.append(confidence)
                    detected_centers.append(np.array([[x + w / 2], [y + h / 2]], dtype=np.float32))

        if boxes:
            keep_indices = non_max_suppression(boxes, scores, 0.1)
            boxes = [boxes[i] for i in keep_indices]
            scores = [scores[i] for i in keep_indices]
            detected_centers = [detected_centers[i] for i in keep_indices]

        self.update_tracked_objects(detected_centers, boxes, scores)

        for box, score in zip(boxes, scores):
            x, y, w, h = box
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
            cv2.putText(frame, f'{score:.2f}', (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        for obj_id, data in self.tracked_objects.items():
            path = data['path']
            color = self.track_colors[obj_id]

            for i in range(1, len(path)):
                alpha = i / len(path)
                thickness = int(alpha * 3) + 1
                cv2.line(frame, tuple(path[i - 1].astype(int)), tuple(path[i].astype(int)), color, thickness)

            predicted = data['kf'].predict()
            x, y = int(predicted[0]), int(predicted[1])
            cv2.circle(frame, (x, y), 5, color, -1)
            cv2.putText(frame, f'ID {obj_id}', (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        return frame, fg_mask, road_mask

    def process_video(self, video_path):
        """
        Process the video frame by frame.
        """
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print("Error: Could not open video")
            return

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            processed_frame, fg_mask, road_mask = self.process_frame(frame)

            cv2.imshow('Road Mask', road_mask)
            cv2.imshow('Foreground Mask', fg_mask)
            cv2.imshow('Tracking', processed_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

processor = VideoProcessor()
processor.process_video('Images/traffic3.mp4')