In [None]:
import cv2
import numpy as np
import math
from collections import deque
import os

# *** Video Configuration ***
CLIPS_FOR_TESTING = [
    './data/clip1.MOV',
    './data/clip2.MOV'
]
#VIDEO_SOURCE = "./data/full_recording.mov"
VIDEO_SOURCE = CLIPS_FOR_TESTING[0]
#VIDEO_SOURCE = CLIPS_FOR_TESTING[1]
OUTPUT_FILE = "./output/final_clip1.mp4"
CASCADE_FILE = "cars.xml" # for haar classifier

# *** Depth calculation constants ***
MAX_DEPTH_METERS = 21.23 
MIN_AREA_THRESHOLD = 20000

# *** Region we care about (bottom 60% b/c cars wont be in the sky)
ROI_START_PCT = 0.4 

# *** Background removal parameters ***
MOG2_BG_REMOVER_HISTORY = 100   
MOG2_BG_REMOVER_THRESHOLD = 40

# *** Object tracking tuning params ***
MIN_HITS = 5 
MAX_AGE = 20 
SPEED_LIMIT = 25         # lower speed limit so that we see violations -> it was a very snowy day so violations were limited
BRISK_THRESHOLD = 20    # controls sensitivity: lower = more points, higher = fewer points. default = 30 ( found thru experimentation )
HARR_SCALE_FACTOR = 1.05
HARR_MIN_NEIGHBORS = 2
BRISK_KEYPOINT_MIN = 5  # if we find fewer than 5 valid corners, it's probably a shadow or road patch

In [23]:
class KalmanTracker:
    def __init__(self, initial_rect, id):
        self.id = id
        self.history = [] 
        self.speeds = deque(maxlen=10)
        
        self.time_since_update = 0 
        self.hits = 0 
        self.age = 0 
        self.violation_recorded = False 
        
        self.kf = cv2.KalmanFilter(4, 2)
        self.kf.transitionMatrix = np.array([
            [1, 0, 1, 0],
            [0, 1, 0, 1],
            [0, 0, 1, 0],
            [0, 0, 0, 1]
        ], np.float32)
        self.kf.measurementMatrix = np.array([
            [1, 0, 0, 0],
            [0, 1, 0, 0]
        ], np.float32)
        self.kf.processNoiseCov = np.eye(4, dtype=np.float32) * 0.05
        self.kf.measurementNoiseCov = np.eye(2, dtype=np.float32) * 0.1
        
        x, y, w, h = initial_rect
        cx, cy = x + w/2, y + h/2
        self.kf.statePost = np.array([[cx], [cy], [0], [0]], np.float32)
        self.kf.errorCovPost = np.eye(4, dtype=np.float32)
        
        self.box_w = w
        self.box_h = h

    def predict(self):
        prediction = self.kf.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hits = 0 
        return (prediction[0][0], prediction[1][0])

    def correct(self, rect):
        x, y, w, h = rect
        cx, cy = x + w/2, y + h/2
        self.kf.correct(np.array([[np.float32(cx)], [np.float32(cy)]]))
        self.box_w = w
        self.box_h = h
        self.hits += 1
        self.time_since_update = 0

    def get_state(self):
        return (self.kf.statePost[0][0], self.kf.statePost[1][0])

In [24]:
class CarTracker:
    def __init__(self, fps):
        self.trackers = [] 
        self.id_count = 0
        self.fps = fps # use fps from video
        self.dist_thresh = 100 

    def update(self, detections, birds_eye_view_transformer, frame_count):
        for trk in self.trackers: trk.predict()
        
        unmatched_detections = list(range(len(detections)))
        unmatched_trackers = list(range(len(self.trackers)))
        matches = [] 

        for t_idx in unmatched_trackers:
            pred_x, pred_y = self.trackers[t_idx].get_state()
            best_dist = self.dist_thresh
            best_d_idx = -1
            
            for d_idx in unmatched_detections:
                dx, dy, dw, dh = detections[d_idx]
                dcx, dcy = dx + dw/2, dy + dh/2
                
                dist = math.hypot(dcx - pred_x, dcy - pred_y)
                if dist < best_dist:
                    best_dist = dist
                    best_d_idx = d_idx
            
            if best_d_idx != -1:
                matches.append((t_idx, best_d_idx))
                unmatched_detections.remove(best_d_idx)
        
        for t_idx, d_idx in matches:
            self.trackers[t_idx].correct(detections[d_idx])
            cx, cy = self.trackers[t_idx].get_state()
            
            self.trackers[t_idx].history.append((cx, cy, frame_count))
            
            if len(self.trackers[t_idx].history) >= 2:
                self.calculate_speed(self.trackers[t_idx], birds_eye_view_transformer)

        active_trackers = []
        for i, trk in enumerate(self.trackers):
            matched = False
            for t_idx, _ in matches:
                if i == t_idx: matched = True
            if not matched: trk.time_since_update += 1 
            if trk.time_since_update < MAX_AGE:
                active_trackers.append(trk)
        self.trackers = active_trackers

        for d_idx in unmatched_detections:
            new_trk = KalmanTracker(detections[d_idx], self.id_count)
            cx, cy = new_trk.get_state()
            new_trk.history.append((cx, cy, frame_count))
            self.trackers.append(new_trk)
            self.id_count += 1

        return self.trackers

    def calculate_speed(self, tracker, birds_eye_view_transformer):
        history = tracker.history
        if len(history) < 2: return
        
        # calc speed over 5 frames for stability
        if len(history) >= 5:
            idx = -5
        else:
            idx = 0
        p2_cam, p1_cam = history[-1], history[idx]

        # use birds eye view geometry transform
        p1_birds_eye_view = birds_eye_view_transformer.transform_point((p1_cam[0], p1_cam[1]))
        p2_birds_eye_view = birds_eye_view_transformer.transform_point((p2_cam[0], p2_cam[1]))
        
        dist_pixels = np.linalg.norm(np.array(p1_birds_eye_view) - np.array(p2_birds_eye_view))
        dist_meters = dist_pixels * (MAX_DEPTH_METERS / birds_eye_view_transformer.birds_eye_view_height)

        # we compute time based on the video fps to determine speed
        frame_delta = p2_cam[2] - p1_cam[2]
        if frame_delta == 0: 
            return
        
        time_secs = frame_delta / self.fps # frame to seconds
        
        if time_secs > 0:
            speed_mph = (dist_meters / time_secs) * 2.23694
            
            if 1.0 < speed_mph < 100.0: # avoid random noise (like a speed over 100mph)
                tracker.speeds.append(speed_mph)

    def get_speed(self, tracker):
        if len(tracker.speeds) > 0:
            return np.mean(tracker.speeds)
        else:
            return 0

In [25]:
class VehicleClassifier:
    def __init__(self, cascade_path):
        if os.path.exists(cascade_path):
            self.car_cascade = cv2.CascadeClassifier(cascade_path)
        else: 
            print("CRITICAL WARNING: HAAR file not found.")
            self.car_cascade = None
        self.brisk = cv2.BRISK_create(BRISK_THRESHOLD)
            
    def classify(self, frame, roi_box, enable_harr=False):
        x, y, w, h = roi_box
        H, W = frame.shape[:2]
        x, y = max(0, x), max(0, y)
        w, h = min(w, W - x), min(h, H - y)
        
        # if frame is super small, its noise. quick sanity check.
        if w < 20 or h < 20: 
            return "noise"
        if (w / float(h)) < 0.8: 
            return "noise"
        
        # extract ONLY our region of interest, the car cant be in the sky
        roi = frame[y:y+h, x:x+w]
        if roi.size == 0: return "noise"
        gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
        
        # this checks if the ROI contains a shape matching the cars.xml haar model provided by OpenCV
        # first pass, to avoid having to do heavy brisk check on everything
        if enable_harr and self.car_cascade:
            # scaleFactor=1.05: Scans image at 5% scale increments (more accurate, slower)
            # minNeighbors=2: Requires at least 2 overlapping detections to confirm
            cars = self.car_cascade.detectMultiScale(gray_roi, scaleFactor=HARR_SCALE_FACTOR,minNeighbors=HARR_MIN_NEIGHBORS)

            if len(cars) == 0: # if harr sees nothing, imeediately reject as noise
                return "noise"
        
        # BRISK Check
        keypoints = self.brisk.detect(gray_roi, None)   # shadows are smooth (low keypoints), cars have grilles/lights (high keypoints)

        if len(keypoints) < BRISK_KEYPOINT_MIN: # if > 5 keypoints, likely NOT a car
            return "noise"

        return "car"

    def draw_speed_limit_overlay(self, frame):
        H, W = frame.shape[:2]
        w_sign, h_sign = 100, 120
        x1 = 20
        y1 = H - h_sign - 80
        x2, y2 = x1 + w_sign, y1 + h_sign
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 255), -1)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 0), 2)
        cv2.putText(frame, "SPEED", (x1 + 28, y1 + 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 0, 0), 1)
        cv2.putText(frame, "LIMIT", (x1 + 32, y1 + 50), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 0, 0), 1)
        text = str(SPEED_LIMIT)
        font = cv2.FONT_HERSHEY_SIMPLEX
        scale = 1.5
        thickness = 3
        
        (text_w, text_h), _ = cv2.getTextSize(text, font, scale, thickness)
        text_x = x1 + (w_sign - text_w) // 2
        text_y = y1 + 95  
        
        cv2.putText(frame, text, (text_x, text_y), font, scale, (0, 0, 0), thickness)

        return frame

In [26]:
class BirdsEyeViewTransformation:
    def __init__(self, width, height):
        w, h = width, height
        self.src_points = np.float32([
            [w * 0.20, h * ROI_START_PCT],  
            [w * 0.80, h * ROI_START_PCT],  
            [w, h],                
            [0, h]                 
        ])
        self.birds_eye_view_width = 300
        self.birds_eye_view_height = 600 
        self.dst_points = np.float32([[0, 0], [self.birds_eye_view_width, 0], [self.birds_eye_view_width, self.birds_eye_view_height], [0, self.birds_eye_view_height]])
        self.M = cv2.getPerspectiveTransform(self.src_points, self.dst_points)
    
    def transform_point(self, point):
        p = np.array([[[point[0], point[1]]]], dtype=np.float32)
        return cv2.perspectiveTransform(p, self.M)[0][0]

In [27]:
def run_system():
    cap = cv2.VideoCapture(VIDEO_SOURCE)
    if not cap.isOpened(): 
        print("CRITICAL failed to read video in")
        return

    ret, frame = cap.read()
    if not ret: 
        print("CRITICAL failed to read video in")
        return
    
    birds_eye_view = BirdsEyeViewTransformation(frame.shape[1], frame.shape[0])
    tracker = CarTracker(cap.get(cv2.CAP_PROP_FPS))
    
    classifier = VehicleClassifier(CASCADE_FILE) # initialize classifier
    
    # remove background
    object_detector = cv2.createBackgroundSubtractorMOG2(history=MOG2_BG_REMOVER_HISTORY, varThreshold=MOG2_BG_REMOVER_THRESHOLD)

    # output
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
    out = cv2.VideoWriter(OUTPUT_FILE, fourcc, 20.0, (frame.shape[1], frame.shape[0]))
    total_violations = 0

    # stores as a tuple: (frame_number, x, y)
    recent_violations = deque(maxlen=5) 
    frame_count = 0

    print(f"Beginning procesing pipeline, saving output to {OUTPUT_FILE}")

    frame_count=0
    while True:
        ret, frame = cap.read()
        if not ret: 
            print("CRITICAL failed to read in frame")
            break
        frame_count+=1
        height, width, _ = frame.shape
        
        # region of interest mask
        mask_roi = np.zeros((height, width), dtype=np.uint8)
        cv2.rectangle(mask_roi, (0, int(height * ROI_START_PCT)), (width, height), 255, -1)
        roi_frame = cv2.bitwise_and(frame, frame, mask=mask_roi)

        # background mask
        mask_motion = object_detector.apply(roi_frame)
        _, mask_motion = cv2.threshold(mask_motion, 250, 255, cv2.THRESH_BINARY)
        mask_motion = cv2.morphologyEx(mask_motion, cv2.MORPH_OPEN, np.ones((3,3),np.uint8))
        mask_motion = cv2.morphologyEx(mask_motion, cv2.MORPH_CLOSE, np.ones((20,20),np.uint8))
        
        # grab edges
        contours, _ = cv2.findContours(mask_motion, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # prep the overlay view
        edges_color = cv2.cvtColor(cv2.Canny(roi_frame, 50, 150), cv2.COLOR_GRAY2BGR)
        mask_color = cv2.cvtColor(mask_motion, cv2.COLOR_GRAY2BGR)
        stage3_frame = roi_frame.copy() 
        kalman_frame = roi_frame.copy() 
        
        boxes_for_nms = []
        confidences = []

        # grab initial contour candidates (just edges w/ a min area)
        for cnt in contours:
            area = cv2.contourArea(cnt)
            if area > MIN_AREA_THRESHOLD:
                x, y, w, h = cv2.boundingRect(cnt)
                if (y + h//2) < height * ROI_START_PCT: 
                    continue
                boxes_for_nms.append([x, y, w, h])
                confidences.append(float(area))
                cv2.rectangle(stage3_frame, (x, y), (x+w, y+h), (255, 0, 0), 2)

        # nonmaximal suppresion
        stage4_frame = roi_frame.copy()
        indices = cv2.dnn.NMSBoxes(boxes_for_nms, confidences, score_threshold=MIN_AREA_THRESHOLD, nms_threshold=0.3)
        confirmed_cars = []
        
        # BRISK to verify car classiifcaiton
        if len(indices) > 0:
            for i in indices.flatten():
                box = boxes_for_nms[i]
                if classifier.classify(frame, box) == "car":
                    confirmed_cars.append(box)
                    cv2.rectangle(stage4_frame, (box[0], box[1]), (box[0]+box[2], box[1]+box[3]), (0, 255, 0), 2) # visualize brisk

        # update tracking
        active_trackers = tracker.update(confirmed_cars, birds_eye_view, frame_count)

        stage5_frame = np.zeros_like(roi_frame)
        for trk in active_trackers:
            if trk.hits >= MIN_HITS:
                points = trk.history
                if len(points) > 1:
                    for j in range(1, len(points)):
                        pt1 = (int(points[j - 1][0]), int(points[j - 1][1]))
                        pt2 = (int(points[j][0]), int(points[j][1]))
                        cv2.line(stage5_frame, pt1, pt2, (0, 255, 255), 2)
                        
            # draw Kalman filter boxes to show tracking
            cx, cy = trk.get_state()
            kx, ky = int(cx - trk.box_w/2), int(cy - trk.box_h/2)
            if trk.hits >= MIN_HITS:
                color = (0, 0, 255)
            else:
                color = (0, 255, 255)
            cv2.rectangle(kalman_frame, (kx, ky), (kx+trk.box_w, ky+trk.box_h), color, 2)

        # dashboard overlay composition
        dashboard_h = int(height * 0.2)
        dashboard_w = int(width / 6)
        font = cv2.FONT_HERSHEY_PLAIN
        frames = [edges_color, mask_color, stage3_frame, stage4_frame, stage5_frame, kalman_frame]
        titles = ["1. EDGES", "2. BACKGROUND MASK + ROI", "3. RAW", "4. NMS + BRISK", "5. PATH", "6. KALMAN TRACKING"]
        colors = [(0,255,0), (0,255,0), (0,255,0), (0,255,0), (0,255,0), (0,255,0)]

        # draw overlays
        for i, (f, t, c) in enumerate(zip(frames, titles, colors)):
            resized = cv2.resize(f, (dashboard_w, dashboard_h))
            cv2.putText(resized, t, (10, 20), font, 1.2, c, 1)
            frame[0:dashboard_h, dashboard_w*i:dashboard_w*(i+1)] = resized
            cv2.line(frame, (dashboard_w*(i+1), 0), (dashboard_w*(i+1), dashboard_h), (255, 255, 0), 2)
        cv2.rectangle(frame, (0,0), (width, dashboard_h), (255, 255, 0), 2)

        # add violation overlay (RED SPEEDING box)
        for trk in active_trackers:
            if trk.hits < MIN_HITS: 
                # track must exist for a certain duration (protect against noisy detections)
                continue
            speed = tracker.get_speed(trk)
            cx, cy = trk.get_state()
            x, y = int(cx - trk.box_w/2), int(cy - trk.box_h/2)
            
            is_speeding = speed > SPEED_LIMIT

            is_duplicate = False
            if is_speeding and not trk.violation_recorded:
                for v_frame, v_x, v_y in recent_violations:
                    if (frame_count - v_frame) < 1000:
                        is_duplicate = True
                        break
                        # dist = math.hypot(cx - v_x, cy - v_y)
                        # if dist < 1000: 
                        #     is_duplicate = True
                        #     break
            
            if is_speeding and not trk.violation_recorded:
                total_violations += 1

            if is_speeding:
                color = (0, 0, 255)
            else:
                color = (0, 255, 0)
            label = f"{int(speed)} MPH"
            if is_speeding: 
                label += " SPEEDING!"
            
            cv2.rectangle(frame, (x, y), (x + trk.box_w, y + trk.box_h), color, 2)
            cv2.putText(frame, label, (x, y - 10), font, 1.5, color, 2)

            # save side profile (metadata) of the violaton.
            if is_speeding and not trk.violation_recorded:
                filename = f"offenders/violation_{total_violations}.jpg"
                cv2.imwrite(filename, frame)
                trk.violation_recorded = True 

        # update global stats that need to persist
        cv2.rectangle(frame, (20, height - 60), (400, height - 10), (0, 0, 0), -1)
        cv2.putText(frame, f"TOTAL VIOLATIONS: {total_violations}", (25, height - 25), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2)
        
        frame = classifier.draw_speed_limit_overlay(frame)
        
        cv2.imshow("Speed Detection", frame)
        out.write(frame)

        if cv2.waitKey(30) == 27: 
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

In [None]:
run_system()

Beginning procesing pipeline, saving output to ./output/final_clip1.mp4
