Bargav Jagatha
Akshat Gurbuxani
Mounika A

In [None]:
import numpy as np
import json
import cv2 as cv

def load_obj_each_frame(data_file):
    with open(data_file, 'r') as file:
        frame_dict = json.load(file)
    return frame_dict['obj']

class KalmanFilter:
    def __init__(self):
        self.dt = 1 # time step

        # Initial state [x, y, dx, dy]
        self.x = np.array([312, 318, 0, 0], dtype=np.float32)
        
        # State transition model
        self.F = np.array([[1, 0, self.dt, 0],
                           [0, 1, 0, self.dt],
                           [0, 0, 1, 0],
                           [0, 0, 0, 1]], dtype=np.float32)
        
        # Measurement model
        self.H = np.array([[1, 0, 0, 0],
                           [0, 1, 0, 0]], dtype=np.float32)
        
        # Measurement noise covariance
        self.R = np.eye(2) * 0.1
        
        # Process noise covariance
        self.Q = np.eye(4) * 0.03
        
        # Initial estimation error covariance
        self.P = np.eye(4)

    def predict(self):
        self.x = np.dot(self.F, self.x)
        self.P = np.dot(self.F, np.dot(self.P, self.F.T)) + self.Q
        return self.x[:2]

    def update(self, z):
        y = z - np.dot(self.H, self.x)  # Measurement residual
        S = np.dot(self.H, np.dot(self.P, self.H.T)) + self.R  # Residual covariance
        K = np.dot(self.P, np.dot(self.H.T, np.linalg.inv(S)))  # Kalman gain
        self.x = self.x + np.dot(K, y)
        self.P = self.P - np.dot(K, np.dot(self.H, self.P))

def draw_target_object_center(video_file, obj_centers):
    kalman = KalmanFilter()
    cap = cv.VideoCapture(video_file)
    ok, image = cap.read()
    vidwrite = cv.VideoWriter("part_1_demo.mp4", cv.VideoWriter_fourcc(*'MP4V'), 30, (700,500))
    
    valid_measurement_received = False

    frame_counter = 0  
    track_points = []  # List to store the points for drawing the track
    res_centers = []

    for pos in obj_centers:
        frame_counter += 1  # Increment the frame counter for each position
        if not ok:
            break
        image = cv.resize(image, (700, 500))

        if pos[0] != -1 and pos[1] != -1:
            kalman.update(np.array(pos, dtype=np.float32))
            valid_measurement_received = True

        if valid_measurement_received:
            predicted = kalman.predict()
            cv.circle(image, (int(predicted[0]), int(predicted[1])), 5, (0, 255, 0), 2)

            if frame_counter >= 11:
                # Add the current predicted position to the track points
                track_points.append((int(predicted[0]), int(predicted[1])))
                
                # Draw the continuous track from the 11th frame onwards
                for i in range(1, len(track_points)):
                    cv.line(image, track_points[i - 1], track_points[i], (255, 0, 0), 2)

        if pos[0] != -1 and pos[1] != -1:
            res_centers.append([pos[0], pos[1]])
        else:
            res_centers.append([int(kalman.x[0]), int(kalman.x[1])])

        vidwrite.write(image)
        ok, image = cap.read()

    vidwrite.release()
    cap.release()

    return res_centers


obj_centers = load_obj_each_frame("object_to_track.json")
video_file = "commonwealth.mp4"
res_centers = draw_target_object_center(video_file, obj_centers)


In [13]:
result = {"obj": res_centers}

with open('part_1_object_tracking.json', 'w') as f:

    json.dump(result, f)

In [15]:
from scipy.optimize import linear_sum_assignment

In [16]:
with open('frame_dict.json') as f:
    frame_dict = json.load(f)

In [17]:
def draw_object_with_id(obj, image, track_id):
    x, y, width, height = obj['x_min'], obj['y_min'], obj['width'], obj['height']
    cv.rectangle(image, (x, y), (x + width, y + height), (0, 255, 0), 2)
    cv.putText(image, str(track_id), (x, y - 10), cv.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)




In [24]:
import cv2 as cv
import numpy as np
from scipy.optimize import linear_sum_assignment

result = []

class Track:
    def __init__(self, detection, track_id):
        self.kalman_filter = KalmanFilter()
        self.kalman_filter.update(np.array([detection['x_min'] + detection['width'] / 2, detection['y_min'] + detection['height'] / 2]))
        self.id = track_id
        self.missed_frames = 0

def draw_objects_in_video(video_file, frame_dict):
    cap = cv.VideoCapture(video_file)
    ok, image = cap.read()
    vidwrite = cv.VideoWriter("part_2_demo.mp4", cv.VideoWriter_fourcc(*'MP4V'), 30, (700, 500))
    tracks = []
    track_id = 0
    
    for frame_num in sorted(frame_dict.keys(), key=int):
        if not ok:
            break
        image = cv.resize(image, (700, 500))
        detections = frame_dict[frame_num]
        
        if not tracks:
            for det in detections:
                tracks.append(Track(det, track_id))
                track_id += 1
        else:
            predictions = [track.kalman_filter.predict()[:2] for track in tracks]
            # Construct cost matrix
            detections_pos = [np.array([det['x_min'] + det['width'] / 2, det['y_min'] + det['height'] / 2]) for det in detections]
            cost_matrix = np.linalg.norm(np.array(predictions)[:, None] - np.array(detections_pos), axis=2)
            rows, cols = linear_sum_assignment(cost_matrix)
            
            # Update step for matched tracks
            for row, col in zip(rows, cols):
                if cost_matrix[row, col] < 40:
                    tracks[row].kalman_filter.update(np.array([detections[col]['x_min'] + detections[col]['width'] / 2, detections[col]['y_min'] + detections[col]['height'] / 2]))
                    tracks[row].missed_frames = 0
                else:
                    tracks[row].missed_frames += 1
            
            # Add new tracks
            for col in set(range(len(detections))) - set(cols):
                tracks.append(Track(detections[col], track_id))
                track_id += 1
            
            # Remove lost tracks
            tracks = [track for track in tracks if track.missed_frames < 14]

        res = []
        
        for track in tracks:
            pred = track.kalman_filter.predict()
            det = {'x_min': int(pred[0] - 20), 'y_min': int(pred[1] - 20), 'width': 40, 'height': 40}
            draw_object_with_id(det, image, track.id)
            det["id"] = track.id
            res.append(det)

        result.append(res)
        
        vidwrite.write(image)
        ok, image = cap.read()

    vidwrite.release()
    cap.release()

video_file = "commonwealth.mp4" 
draw_objects_in_video(video_file, frame_dict)


OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


In [22]:
res_dict = {}

for i, x in enumerate(result):
    res_dict[str(i)] = x

In [23]:

with open('part_2_frame_dict.json', 'w') as f:
    json.dump(res_dict, f)