### Part 1

Alpha-Beta filter

In [1]:
import json
import cv2 as cv
import numpy as np

def load_obj_each_frame(data_file):
    with open(data_file, 'r') as file:
        frame_dict = json.load(file)
    return frame_dict

def alpha_beta_filter(initial_position, initial_velocity, alpha, beta, observations, dt=1):
    estimated_position = initial_position
    estimated_velocity = initial_velocity
    estimates = []

    for observation in observations:
        # Prediction step
        predicted_position = estimated_position + estimated_velocity * dt
        predicted_velocity = estimated_velocity

        # Update step (if observation is available)
        if observation != [-1, -1]:
            residual = observation - predicted_position
            estimated_position = predicted_position + alpha * residual
            estimated_velocity = predicted_velocity + (beta * residual) / dt
        else:
            # use the prediction if observation is missing
            estimated_position = predicted_position
            estimated_velocity = predicted_velocity

        estimates.append([estimated_position, estimated_velocity])

    return estimates

def draw_target_object_center(video_file, obj_centers):
    count = 0
    cap = cv.VideoCapture(video_file)
    ok, image = cap.read()

    # Initialize Alpha-Beta filter parameters
    initial_position = np.array([313, 229])     # Initial guess for position
    initial_velocity = np.array([-0.47328952, -0.3911483])  # Initial guess for velocity
    alpha = 0.4     # Position update factor
    beta = 0.0005   # Velocity update factor
    
    filtered_estimates = alpha_beta_filter(initial_position, initial_velocity, alpha, beta, obj_centers)

    # Save in a JSON file
    estimated_positions = [x[0] for x in filtered_estimates]
    part_1_object_tracking = [[int(round(x)), int(round(y))] for x, y in estimated_positions]
    output_data = {"obj": part_1_object_tracking}
    with open('part_1_object_tracking.json', 'w', encoding='utf-8') as file:
        json.dump(output_data, file, ensure_ascii=False, indent=None)
    print("Successfully saved in part_1_object_tracking.json!")

    # Visualize the smoothed track
    vidwrite = cv.VideoWriter("part_1_demo.mp4", cv.VideoWriter_fourcc(*'MP4V'), 30, (700,500))
    while ok:
        if count < len(filtered_estimates):
            pos, _ = filtered_estimates[count]
            pos_x, pos_y = pos
        else:
            break

        count += 1
        ###### !!! #######
        # Make sure the video is resized. Otherwise the coords in the data file won't work.
        image = cv.resize(image, (700, 500)) 
        ###### !!! #######
        # Draw the circle at the estimated position
        # if pos_x != -1 and pos_y != -1:
        #     image = cv.circle(image, (int(pos_x), int(pos_y)), 1, (0, 0, 255), 2)
        # Draw the Line
        for i in range(count):
            pos, _ = filtered_estimates[i]
            pos_x, pos_y = pos
            image = cv.circle(image, (int(pos_x), int(pos_y)), 1, (0, 0, 255), 2)
        vidwrite.write(image)
        
        ok, image = cap.read()

    vidwrite.release()

In [2]:
frame_dict = load_obj_each_frame("object_to_track.json")
video_file = "commonwealth.mp4"
draw_target_object_center(video_file,frame_dict['obj'])

Successfully saved in part_1_object_tracking.json!


OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


Kalman filter

In [None]:

def kalman_filter(initial_state, initial_covariance, transition_matrix, observation_matrix, process_noise, measurement_noise, observations):
    state_estimate = initial_state
    covariance_estimate = initial_covariance
    estimates = []

    for observation in observations:
        # Prediction step
        predicted_state = transition_matrix @ state_estimate
        predicted_covariance = transition_matrix @ covariance_estimate @ transition_matrix.T + process_noise

        # Update step
        if np.array(observation).all() != -1: 
            observation = np.array(observation)
            innovation = observation - (observation_matrix @ predicted_state)
            innovation_covariance = observation_matrix @ predicted_covariance @ observation_matrix.T + measurement_noise

            kalman_gain = predicted_covariance @ observation_matrix.T @ np.linalg.inv(innovation_covariance)

            state_estimate = predicted_state + kalman_gain @ innovation
            covariance_estimate = (np.eye(len(initial_state)) - kalman_gain @ observation_matrix) @ predicted_covariance
        else:
            # skip this step if observation is missing
            state_estimate = predicted_state
            covariance_estimate = predicted_covariance

        estimates.append(state_estimate.tolist())

    return estimates

In [254]:
import json
import cv2 as cv
import numpy as np

class KalmanFilter2D:
    def __init__(self, dt=1, process_noise=1e-2, measurement_noise=1e-1, initial_state=np.array([[0], [0], [0], [0]])):
        """
        Initialize the Kalman Filter for 2D object tracking.
        :param dt: Time step
        :param process_noise: Variance of the process noise
        :param measurement_noise: Variance of the measurement noise
        :param initial_state: Initial state vector [x, vx, y, vy].T
        """
        self.dt = dt
        self.state = initial_state
        self.state_cov = np.eye(4) * 1000  # Initial state covariance
        
        # Define the state transition matrix
        self.F = np.array([[1, dt, 0, 0],
                           [0, 1, 0, 0],
                           [0, 0, 1, dt],
                           [0, 0, 0, 1]])
        
        # Measurement matrix
        self.H = np.array([[1, 0, 0, 0],
                           [0, 0, 1, 0]])
        
        # Process noise covariance
        q = process_noise
        self.Q = q * np.array([[dt**4/4, dt**3/2, 0, 0],
                               [dt**3/2, dt**2, 0, 0],
                               [0, 0, dt**4/4, dt**3/2],
                               [0, 0, dt**3/2, dt**2]])
        
        # Measurement noise covariance
        self.R = np.eye(2) * measurement_noise
        
        # Identity matrix
        self.I = np.eye(4)
    
    def predict(self):
        self.state = np.dot(self.F, self.state)
        self.state_cov = np.dot(self.F, np.dot(self.state_cov, self.F.T)) + self.Q
    
    def update(self, measurement):
        Z = np.array(measurement).reshape(2, 1)
        y = Z - np.dot(self.H, self.state)  # Measurement pre-fit residual
        S = np.dot(self.H, np.dot(self.state_cov, self.H.T)) + self.R  # Residual covariance
        K = np.dot(np.dot(self.state_cov, self.H.T), np.linalg.inv(S))  # Optimal Kalman gain
        self.state = self.state + np.dot(K, y)
        self.state_cov = self.state_cov - np.dot(K, np.dot(self.H, self.state_cov))
    
    def get_current_estimate(self):
        return self.state[:2].flatten().tolist()  # Return position part of the state vector

def draw_target_object_center_with_kalman(video_file, obj_centers):
    kf = KalmanFilter2D(dt=1, process_noise=1e-2, measurement_noise=1e-1)
    
    cap = cv.VideoCapture(video_file)
    ok, image = cap.read()
    vidwrite = cv.VideoWriter("part_1_demo_kalman.mp4", cv.VideoWriter_fourcc(*'MP4V'), 30, (700,500))
    
    for measurement in obj_centers:
        kf.predict()
        if measurement != [-1, -1]:  # Valid measurement
            kf.update(measurement)
        estimated_position = kf.get_current_estimate()
        
        if ok:
            image = cv.resize(image, (700, 500))
            pos_x, pos_y = estimated_position
            image = cv.circle(image, (int(pos_x), int(pos_y)), 1, (0, 0, 255), 2)
            vidwrite.write(image)
            ok, image = cap.read()
    
    vidwrite.release()
    print("Video with Kalman filter tracking has been saved.")




In [255]:
# Load object tracking data
frame_dict = load_obj_each_frame("object_to_track.json")
video_file = "commonwealth.mp4"
draw_target_object_center_with_kalman(video_file, frame_dict['obj'])

OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


Video with Kalman filter tracking has been saved.


In [250]:
final_state = kf.process_measurement(frame_dict['obj'])
print(f"Final State: {final_state}")

Final State: [165.9275572600513, 124.44063029498497, -6.883585433770101e-06, 0.5358969805122851]


# Part 2

In [403]:
import json
import cv2 as cv
import numpy as np
from scipy.optimize import linear_sum_assignment

# Alpha-Beta Filter
class AlphaBetaFilter:
    def __init__(self, alpha, beta, dt):
        self.alpha = alpha
        self.beta = beta
        self.dt = dt

    def predict(self, position, velocity):
        predicted_position = position + velocity * self.dt
        return predicted_position

    def update(self, predicted_position, current_measurement):
        residual = current_measurement - predicted_position
        position = predicted_position + self.alpha * residual
        velocity = (self.beta * residual) / self.dt
        return position, velocity

# Calculate centroid of a bounding box
def calculate_centroid(x_min, y_min, width, height):
    return np.array([x_min + width / 2, y_min + height / 2])

# Load frame data from JSON file
def load_obj_each_frame(data_file):
    with open(data_file, 'r') as file:
        frame_dict = json.load(file)
    return frame_dict

# Tracking and ID assignment
def track_objects(frame_dict):
    tracked_objects = []
    next_id = 0
    default_id = 0

    for frame, detections in frame_dict.items():
        centroids = [calculate_centroid(obj['x_min'], obj['y_min'], obj['width'], obj['height']) for obj in detections]
        centroids_np = np.array(centroids)  # Convert centroids list to a NumPy array

        if not tracked_objects:  # Initialize tracked objects in the first frame
            for obj in detections:
                obj['id']=default_id
                default_id+=1
            
            for centroid in centroids_np:
                tracked_objects.append({
                    'filter': AlphaBetaFilter(alpha=0.85, beta=0.005, dt=1), # Alpha, Beta
                    'position': centroid,
                    'velocity': np.array([0, 0]),
                    'id': next_id
                })
                next_id += 1

        else:
            # Convert tracked object positions to a NumPy array for vectorized operations
            predicted_positions = np.array([obj['filter'].predict(obj['position'], obj['velocity']) for obj in tracked_objects])
            
            # Calculate cost matrix as Euclidean distances between predictions and detections
            cost_matrix = np.linalg.norm(predicted_positions[:, np.newaxis] - centroids_np, axis=2)
            
            # Solve the assignment problem
            row_ind, col_ind = linear_sum_assignment(cost_matrix)
            
            # Update and assign IDs based on the assignment
            used_ids = set()
            for row, col in zip(row_ind, col_ind):
                if cost_matrix[row, col] < 50:  # Threshold for distance
                    obj = tracked_objects[row]
                    centroid = centroids_np[col]
                    position, velocity = obj['filter'].update(obj['position'], centroid)
                    obj['position'] = position
                    obj['velocity'] = velocity
                    detections[col]['id'] = obj['id']  # Assign existing ID to detection
                    used_ids.add(obj['id'])
            
            # Check for new detections and assign new IDs
            for i, detection in enumerate(detections):
                if 'id' not in detection:  # New detection
                    detection['id'] = next_id
                    centroid = centroids_np[i]
                    tracked_objects.append({
                        'filter': AlphaBetaFilter(alpha=0.85, beta=0.0005, dt=1), #Alpha, Beta
                        'position': centroid,
                        'velocity': np.array([0, 0]),
                        'id': next_id
                    })
                    next_id += 1

    return frame_dict

# Draw object with ID
def draw_object(object_dict, image):
    x = object_dict['x_min']
    y = object_dict['y_min']
    width = object_dict['width']
    height = object_dict['height']
    object_id = object_dict.get('id', 'N/A')
    cv.rectangle(image, (x, y), (x + width, y + height), (0, 255, 0), 2)
    cv.putText(image, f'ID: {object_id}', (x, y - 10), cv.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

# Process video and draw tracked objects
def draw_objects_in_video(video_file, frame_dict):
    cap = cv.VideoCapture(video_file)
    vidwrite = cv.VideoWriter("part_2_demo.mp4", cv.VideoWriter_fourcc(*'MP4V'), 30, (700, 500))
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv.resize(frame, (700, 500))
        if str(frame_count) in frame_dict:
            for obj in frame_dict[str(frame_count)]:
                draw_object(obj, frame)
        vidwrite.write(frame)
        frame_count += 1
    cap.release()
    vidwrite.release()




In [404]:
# Main workflow
data_file = 'frame_dict.json'
frame_dict = load_obj_each_frame(data_file)
frame_dict_with_ids = track_objects(frame_dict)  # Assign unique IDs

with open('part_2_frame_dict.json', 'w', encoding='utf-8') as file:
    json.dump(frame_dict_with_ids, file, ensure_ascii=False, indent=None)

print("Position were saved as part_2_frame_dict.json")

video_file = "commonwealth.mp4"
draw_objects_in_video(video_file, frame_dict_with_ids)

{'0': [{'x_min': 565, 'y_min': 411, 'width': 44, 'height': 32, 'id': 0}, {'x_min': 603, 'y_min': 441, 'width': 45, 'height': 35, 'id': 1}, {'x_min': 198, 'y_min': 318, 'width': 29, 'height': 30, 'id': 2}, {'x_min': 103, 'y_min': 218, 'width': 29, 'height': 20, 'id': 3}, {'x_min': 121, 'y_min': 67, 'width': 14, 'height': 13, 'id': 4}, {'x_min': 302, 'y_min': 221, 'width': 25, 'height': 21, 'id': 5}], '1': [{'x_min': 565, 'y_min': 411, 'width': 43, 'height': 32, 'id': 0}, {'x_min': 603, 'y_min': 441, 'width': 45, 'height': 34, 'id': 1}, {'x_min': 198, 'y_min': 318, 'width': 31, 'height': 30, 'id': 2}, {'x_min': 147, 'y_min': 353, 'width': 17, 'height': 17, 'id': 6}, {'x_min': 304, 'y_min': 220, 'width': 21, 'height': 22, 'id': 5}], '2': [{'x_min': 565, 'y_min': 411, 'width': 44, 'height': 33, 'id': 0}, {'x_min': 603, 'y_min': 441, 'width': 45, 'height': 34, 'id': 1}, {'x_min': 198, 'y_min': 319, 'width': 30, 'height': 30, 'id': 2}, {'x_min': 148, 'y_min': 354, 'width': 17, 'height': 17, 

OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


## with Hungarian (unfinished)

In [345]:
def track_objects_with_hungarian(frame_dict):
    tracked_objects = []  # List to keep track of objects and their filters
    next_id = 0  # Unique ID for each object
    
    for frame, detections in sorted(frame_dict.items(), key=lambda x: int(x[0])):
        if not tracked_objects:
            # Initialize tracking for the first frame
            for detection in detections:
                centroid = calculate_centroid(detection['x_min'], detection['y_min'], detection['width'], detection['height'])
                tracked_objects.append({
                    'filter': AlphaBetaFilter(alpha=0.4, beta=0.1, dt=1),
                    'position': centroid,
                    'velocity': np.array([0, 0]),
                    'id': next_id
                })
                next_id += 1
        else:
            # Predict the next position for all tracked objects
            predictions = np.array([obj['filter'].predict(obj['position'], obj['velocity']) for obj in tracked_objects])
            # Current detections
            current_detections = np.array([calculate_centroid(det['x_min'], det['y_min'], det['width'], det['height']) for det in detections])
            
            # Create the cost matrix based on Euclidean distance
            cost_matrix = np.linalg.norm(predictions[:, None] - current_detections, axis=2)
            
            # Solve the assignment problem
            rows, cols = linear_sum_assignment(cost_matrix)
            
            assigned_detections = set()
            for row, col in zip(rows, cols):
                if cost_matrix[row, col] > 50:  # Threshold to discard unmatched
                    continue
                obj = tracked_objects[row]
                detection = detections[col]
                # Update position and velocity with the Alpha-Beta filter
                obj['position'], obj['velocity'] = obj['filter'].update(obj['position'], current_detections[col])
                detection['id'] = obj['id']  # Assign the tracked object ID to the detection
                assigned_detections.add(col)
            
            # Check for new detections and add them as new tracked objects
            for i, detection in enumerate(detections):
                if i not in assigned_detections:
                    centroid = current_detections[i]
                    tracked_objects.append({
                        'filter': AlphaBetaFilter(alpha=0.4, beta=0.1, dt=1),
                        'position': centroid,
                        'velocity': np.array([0, 0]),
                        'id': next_id
                    })
                    detection['id'] = next_id
                    next_id += 1

    return frame_dict


In [346]:
frame_dict = load_obj_each_frame("frame_dict.json")
frame_dict_with_ids = track_objects_with_hungarian(frame_dict)
video_file = "commonwealth.mp4"
draw_objects_in_video(video_file, frame_dict_with_ids)


OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


In [347]:
print(frame_dict_with_ids)

{'0': [{'x_min': 565, 'y_min': 411, 'width': 44, 'height': 32}, {'x_min': 603, 'y_min': 441, 'width': 45, 'height': 35}, {'x_min': 198, 'y_min': 318, 'width': 29, 'height': 30}, {'x_min': 103, 'y_min': 218, 'width': 29, 'height': 20}, {'x_min': 121, 'y_min': 67, 'width': 14, 'height': 13}, {'x_min': 302, 'y_min': 221, 'width': 25, 'height': 21}], '1': [{'x_min': 565, 'y_min': 411, 'width': 43, 'height': 32, 'id': 0}, {'x_min': 603, 'y_min': 441, 'width': 45, 'height': 34, 'id': 1}, {'x_min': 198, 'y_min': 318, 'width': 31, 'height': 30, 'id': 2}, {'x_min': 147, 'y_min': 353, 'width': 17, 'height': 17, 'id': 6}, {'x_min': 304, 'y_min': 220, 'width': 21, 'height': 22, 'id': 5}], '2': [{'x_min': 565, 'y_min': 411, 'width': 44, 'height': 33, 'id': 0}, {'x_min': 603, 'y_min': 441, 'width': 45, 'height': 34, 'id': 1}, {'x_min': 198, 'y_min': 319, 'width': 30, 'height': 30, 'id': 2}, {'x_min': 148, 'y_min': 354, 'width': 17, 'height': 17, 'id': 6}, {'x_min': 301, 'y_min': 219, 'width': 24, 'h