# Exercise 1.2

## Install Necessary Packages

In [7]:
%pip install opencv-python numpy tabulate

Note: you may need to restart the kernel to use updated packages.




## Import Packages

In [8]:
import time
import cv2
import numpy as np
import tabulate

## Main Code

In [9]:
# sets a minimum contour area threshold for filtering out contours that are too small to be a car
MIN_CONTOUR_AREA = 4000

In [10]:
class Tracker:
    # monitors objects in the video feed
    class TrackingObject:
        # signifies the monitored objects
        def __init__(self, initial_contour, tracking_id, ttl) -> None:
            # creates a new monitored object with its initial shape, a unique ID for tracking
            # creates a limit on how many frames it can be missing before removal
            self.tracking_id = tracking_id
            self.ttl = ttl
            self.location_history = []
            self.centroid = Tracker.calc_centroid(initial_contour)
            self.updated = True

            # starts the Kalman filter to forecast the object's next position, in the upcoming frame if no contour covers it
            # enhancing the tracker's reliability
            kalman_2d = cv2.KalmanFilter(4, 2)
            kalman_2d.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
            kalman_2d.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
            kalman_2d.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32) * 1e-4
            self.kalman_2d = kalman_2d

        def reset_updated(self):
            # resets the flag indicating if the updates have been made
            self.updated = False

        def update_contour(self, contour):
            # updates the TrackingObject instance with a new contour, sets updated to True
            # appends the centroid to the location history, and calculates a new centroid based on the input contour
            self.updated = True
            self.location_history.append(self.centroid)
            self.centroid = Tracker.calc_centroid(contour)

            # predicts the next position of the object using a Kalman filter in 2D
            # corrects the prediction based on the centroid coordinates of the object
            self.kalman_2d.predict()
            self.kalman_2d.correct(np.array([[np.float32(self.centroid[0])], [np.float32(self.centroid[1])]]))
        
        def update_kalman(self):
            # updates the object with a new Kalman prediction based on the location history and the Kalman filter in 2D
            if len(self.location_history) < 5:
                return
            prediction = self.kalman_2d.predict()
            self.centroid = (int(prediction[0]), int(prediction[1]))
            self.location_history.append(self.centroid)

        def x_direction(self) -> float:
            # calculates the direction of movement in the left or right direction
            # based on the location history using linear regression
            
            # requires a minimum of 5 points for prediction
            if len(self.location_history) < 5:
                return 0
            xs = np.array(self.location_history)[:, 0]
            time = np.arange(len(xs))
    
            # calculates the slope of the line of best fit using polynomial fitting and return the slope value
            slope, _ = np.polyfit(time, xs, 1)
    
            # Slope greater than 0 indicates right movement, slope less than 0 indicates left movement
            # and slope equal to 0 indicates no movement
            return slope

    def __init__(self, proximity_threshold = 20, object_ttl = 10) -> None:
        # initializes parameters for object tracking
        self.objects = []
        self.last_tracking_id = 0
        self.proximity_threshold = proximity_threshold
        self.object_ttl = object_ttl
        self.onObjectRemoved = None

    @classmethod
    def calc_centroid(cls, contour):
        # determines the center point of a contour using the moments method
        M = cv2.moments(contour)
        if M["m00"] == 0:
            return None
        cX = int(M["m10"] / M["m00"])
        cY = int(M["m01"] / M["m00"])
        return (cX, cY)

    def update(self, contours):
        # updates the tracker with new frame data of contours and centroids
        original_contours = contours.copy()
        contours = contours.copy()
        [obj.reset_updated() for obj in self.objects]

        # finds the object closest to the centroid of each contour
        for i, contour in enumerate(contours):
            centroid = Tracker.calc_centroid(contour)
            if centroid is None:
                continue

            # finds the object closest to a given centroid among a list of objects
            closest_object = None
            closest_distance = float('inf')
            for obj in self.objects:
                distance = np.linalg.norm(np.array(obj.centroid) - np.array(centroid))
                if distance < closest_distance:
                    closest_object = obj
                    closest_distance = distance

            # updates the object if it gets sufficiently close
            if closest_distance < self.proximity_threshold:
                closest_object.update_contour(contour)
                contours[i] = None

        # adds remaining contours as new objects
        for contour in contours:
            if contour is None:
                continue
            for object in self.objects:
                # skips the contour it already has an object
                if cv2.pointPolygonTest(contour, object.centroid, False) > 0:
                    break
            else:
                # the initial object with ID 1 will represent the entire frame
                self.last_tracking_id = self.last_tracking_id + 1
                self.objects.append(Tracker.TrackingObject(contour, self.last_tracking_id, self.object_ttl))
        [obj.update_kalman() for obj in self.objects if not obj.updated]

        # removes objects that do not have any contours covering them
        for obj in self.objects:
            for contour in original_contours:
                if cv2.pointPolygonTest(contour, obj.centroid, False) > 0:
                    break
            else:
                # decreases the Time To Live (TTL) if the object is not within any contour
                obj.ttl -= 1

                # removes the object from the list if TTL is zero
                if obj.ttl <= 0:
                    self.objects.remove(obj)
                    if self.onObjectRemoved is not None:
                        self.onObjectRemoved(obj)


In [11]:
def count_cars_going_left(file: str, debug=False) -> (int, float):
    # if debug is set to True, the video will show contours and tracking objects
    # counts how many cars are heading to the city centre
    # provides the count of cars going to the city center and the rate of cars per minute in a tuple

    video = cv2.VideoCapture(file)
    assert video.isOpened(), "Can't open the video file"

    # creates an instance of the KNN background subtractor with shadow detection
    bg_subtractor = cv2.createBackgroundSubtractorKNN(detectShadows=True, history=10000, dist2Threshold=400)

    # creates an instance of the Tracker class
    tracker = Tracker()

    if debug:
        # initializes window to show debug video
        cv2.namedWindow("Cam")

    # a set is used to track the cars that have been counted
    counted_car_ids = set()

    # defines the function that the tracker will use when an object is removed
    def onObjectRemoved(obj):
        if obj.tracking_id not in counted_car_ids and obj.x_direction() < -2 and len(obj.location_history) > 80:
            counted_car_ids.add(obj.tracking_id)

    # sets the function that the tracker will use when an object is removed
    tracker.onObjectRemoved = onObjectRemoved

    # analyzes the video on a frame-by-frame basis
    while True:
        ret, frame = video.read()

        # checks if the video frame was successfully read. If not, it breaks out of the loop
        if not ret:
            break

        # applies Gaussian blur to the frame, removing high-frequency components like noise and edges
        blur = cv2.GaussianBlur(frame, (5, 5), 0)

        # applies the background subtraction algorithm to the blurred frame
        fg_mask = bg_subtractor.apply(blur)
        
        # applies a threshold to the foreground mask to eliminate shadows
        _, fg_mask = cv2.threshold(fg_mask, 100, 255, cv2.THRESH_BINARY)

        # applies morphological closing operation on the foreground mask 
        # to fill in gaps and smooth the contours of detected objects
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel, iterations=2)

        # finds the contours of objects in the foreground mask
        contours, _ = cv2.findContours(fg_mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

        # filters out small contours with an area smaller than the specified minimum contour area
        contours = [c for c in contours if cv2.contourArea(c) > MIN_CONTOUR_AREA]

        # excludes contours in the top half of the frame to focus on cars moving along Main Street
        contours = [c for c in contours if cv2.boundingRect(c)[1] > frame.shape[0] / 2]

        # gets clean contours without holes by applying the convex hull algorithm to each contour in the list
        contours = [cv2.convexHull(c) for c in contours]

        if debug:
            # draws contours
            cv2.drawContours(frame, contours, -1, (0, 255, 0), 2)

        # updates the tracker using contour information
        tracker.update(contours)

        if debug:
            # draws tracking objects
            for obj in tracker.objects:
                cv2.putText(frame, f"Car #{obj.tracking_id}", obj.centroid, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                cv2.circle(frame, obj.centroid, 2, (0, 0, 255), -1)
            cv2.putText(frame, f'Number of cars go to the city centre: {len(counted_car_ids)}', (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
            
            # displays resulting frame
            cv2.imshow("Cam", frame)

            # breaks out of the loop if the 'q' key is pressed
            if (cv2.waitKey(1) & 0xFF == ord('q')):
                break

    # closes debug output window when everything is finished
    if debug:
        cv2.destroyWindow("Cam")
        cv2.waitKey(1)

    # calculates the number of cars by getting the length of the counted car IDs
    number_of_cars = len(counted_car_ids)

    # calculates the duration of the video in seconds
    frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(video.get(cv2.CAP_PROP_FPS))
    duration_seconds = frame_count / fps

    # calculates the average number of cars per minute
    cars_per_minute = number_of_cars / duration_seconds * 60
    video.release()
    return (len(counted_car_ids), cars_per_minute)

In [None]:
# creates a list of filenames and the count of cars going left in each file
FILES = ["Traffic_Laramie_1.mp4", "Traffic_Laramie_2.mp4"]
table_data = [
    [file, *count_cars_going_left(file)] for file in FILES
]

  self.centroid = (int(prediction[0]), int(prediction[1]))


In [None]:
table = tabulate.tabulate(table_data, tablefmt='html', headers=["", "Total number of cars", "Cars per minute"])
table