# Kalman filter (without tracking)

In [5]:
import os
import time
import cv2
import numpy as np

from kalman_filters import KalmanFilter3D_CA, KalmanFilter3D_CV

# =============================
# SINGLE TRACKING LABEL CLASS
# =============================
class TrackingLabel:
    def __init__(self, frame, track_id, obj_type, truncated, occluded, alpha,
                 bbox, dimensions, location, rotation_y, score=None):
        self.frame = int(frame)
        self.track_id = int(track_id)
        self.obj_type = obj_type           # 'Car', 'Pedestrian', 'Cyclist', etc.
        self.truncated = float(truncated)  # 0..1
        self.occluded = int(occluded)      # 0,1,2,3
        self.alpha = float(alpha)          # [-pi, pi]
        
        self.bbox = np.array(bbox, dtype=float)          # [left, top, right, bottom]
        self.dimensions = np.array(dimensions, dtype=float)  # [h, w, l]
        self.location = np.array(location, dtype=float)      # [x, y, z]
        self.rotation_y = float(rotation_y)             # [-pi, pi]
        self.score = float(score) if score is not None else None

    def __repr__(self):
        return f"TrackingLabel(frame={self.frame}, track_id={self.track_id}, type={self.obj_type})"


# =============================
# LABEL FILE LOADER CLASS
# =============================
class TrackingLabelLoader:
    def __init__(self, filename):
        self.filename = filename
        self.labels = []

    def load(self):
        with open(self.filename, "r") as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) < 17:  # ignore incomplete lines
                    continue

                frame      = parts[0]
                track_id   = parts[1]
                obj_type   = parts[2]
                truncated  = parts[3]
                occluded   = parts[4]
                alpha      = parts[5]

                bbox       = [parts[6], parts[7], parts[8], parts[9]]
                dimensions = [parts[10], parts[11], parts[12]]
                location   = [parts[13], parts[14], parts[15]]
                rotation_y = parts[16]
                score      = parts[17] if len(parts) > 17 else None

                label = TrackingLabel(frame, track_id, obj_type, truncated, occluded, alpha,
                                      bbox, dimensions, location, rotation_y, score)
                self.labels.append(label)
        return self.labels

    def get_labels_for_frame(self, frame_id):
        """Return all labels for a specific frame."""
        return [label for label in self.labels if label.frame == frame_id]

    def get_labels_for_track(self, track_id):
        """Return all labels for a specific object track."""
        return [label for label in self.labels if label.track_id == track_id]
    

# =============================
# TRACKED OBJECT CLASS
# =============================
class TrackedObject:
    def __init__(self, track_id, type, color):
        self.track_id = track_id
        self.kf = KalmanFilter3D_CV() if type == "Pedestrian" else KalmanFilter3D_CA()
        self.color = color
        self.last_update = time.time()
    
    def update(self, X, Y, Z):
        Z_meas = np.array([[X],[Y],[Z]])
        self.kf.update(Z_meas)
    
    def predict(self):
        now = time.time()
        dt = now - self.last_update
        self.last_update = now
        self.kf.predict(dt)
    
    def get_predicted_position(self):
        return self.kf.x[0,0], self.kf.x[1,0], self.kf.x[2,0]


# =============================
# MULTI-OBJECT TRACKER CLASS
# =============================
class Tracker:
    def __init__(self, colors=None):
        self.tracked_id = None
        self.tracked_type = None
        # self.tracked_type = "Pedestrian"
        # self.tracked_type = "Cyclist"
        self.objects = {}
        self.colors = colors if colors else [(255,0,0)]
    
    def update(self, tid, type, X, Y, Z):
        if not self.is_tracked(tid, type):
            return
        if tid not in self.objects:
            color = self.colors[len(self.objects) % len(self.colors)]
            self.objects[tid] = TrackedObject(tid, type, color)
        self.objects[tid].update(X,Y,Z)

    def is_tracked(self, tid, type):
        if self.tracked_id != None and tid == self.tracked_id:
            return True
        if self.tracked_id == None and self.tracked_type != None and type == self.tracked_type:
            return True
        if self.tracked_id == None and self.tracked_type == None:
            return True
        return False
    
    def predict(self):
        for obj in self.objects.values():
            obj.predict()
    
    def get_objects(self):
        return self.objects.values()
    
    def remove_object(self, track_id):
        if track_id in self.objects:
            del self.objects[track_id]
    

# =============================
# CAMERA
# =============================
class Camera:
    def __init__(self, K_rect):
        self.K = K_rect

    def project_point(self, P):
        X, Y, Z = P
        if Z <= 0:
            return None
        u = self.K[0,0] * X / Z + self.K[0,2]
        v = self.K[1,1] * Y / Z + self.K[1,2]
        return int(u), int(v)


# =============================
# OCCLUSION RECTANGLE
# =============================
def is_in_rectangle(uv, rect):
    """Check if a 2D point uv=(u,v) is inside rectangle rect=(x1,y1,x2,y2)."""
    if uv is None:
        return False
    u, v = uv
    x1, y1, x2, y2 = rect
    return x1 <= u <= x2 and y1 <= v <= y2


# =============================
# PREDICTION EVALUATOR CLASS
# =============================
class Evaluator:
    def __init__(self):
        # Store data in a dict: {track_id: list of (frame, predicted_pos, gt_pos)}
        self.data = {}

    def log(self, frame_idx, track_id, predicted_pos, gt_pos):
        """
        Log the predicted position and ground-truth position for a frame.
        """
        if track_id not in self.data:
            self.data[track_id] = []
        self.data[track_id].append((frame_idx, predicted_pos, gt_pos))

    @staticmethod
    def compute_error(predicted_pos, gt_pos):
        """
        Compute Euclidean distance between predicted and ground-truth positions.
        """
        return np.linalg.norm(np.array(predicted_pos) - np.array(gt_pos))

    def get_frame_errors(self, frame_idx):
        """
        Return a list of errors for all tracks in a specific frame.
        """
        errors = []
        for track_id, records in self.data.items():
            for f, pred, gt in records:
                if f == frame_idx:
                    err = self.compute_error(pred, gt)
                    errors.append((track_id, err))
        return errors

    def get_track_errors(self, track_id):
        """
        Return a list of errors for a specific track over time.
        """
        errors = []
        if track_id in self.data:
            for f, pred, gt in self.data[track_id]:
                err = self.compute_error(pred, gt)
                errors.append((f, err))
        return errors

    def get_all_errors(self):
        """
        Return all errors across all tracks and frames.
        """
        all_errors = []
        for track_id in self.data:
            for f, pred, gt in self.data[track_id]:
                all_errors.append(self.compute_error(pred, gt))
        return all_errors

    def get_summary(self):
        """
        Return mean, max, min of all errors.
        """
        all_errors = self.get_all_errors()
        if all_errors:
            return {
                'mean': np.mean(all_errors),
                'max': np.max(all_errors),
                'min': np.min(all_errors)
            }
        else:
            return {'mean': None, 'max': None, 'min': None}


# =============================
# MAIN FUNCTION
# =============================
def main():
    image_folder = "./seq_02/image_02/data/"
    label_file   = "./seq_02/labels.txt"
    occlusion_threshold = 3
    occlusion_rect = (660, 100, 910, 350)
    bounding_box = (0, 0, 1224, 370)

    font                   = cv2.FONT_HERSHEY_SIMPLEX
    fontScale              = 0.7
    fontColor              = (255,255,255)
    thickness              = 3
    lineType               = 2

    # Camera
    camera = Camera(np.array([
        [7.070493e+02, 0.000000e+00, 6.040814e+02, 0.000000e+00],
        [0.000000e+00, 7.070493e+02, 1.805066e+02, 0.000000e+00],
        [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00]
    ]))

    # Load labels
    loader = TrackingLabelLoader(label_file)
    labels = loader.load()

    evaluator = Evaluator()

    tracker = Tracker()
    frame_ids = sorted(np.unique([label.frame for label in labels]))

    for frame_idx in frame_ids:
        frame_labels = loader.get_labels_for_frame(frame_idx)
        img_file = os.path.join(image_folder, f"{int(frame_idx):010d}.png")
        frame = cv2.imread(img_file)
        if frame is None:
            print("Missing frame:", img_file)
            continue
        
        tracker.predict()

        predicted_positions = {obj.track_id: obj.get_predicted_position()
                           for obj in tracker.get_objects()}

        # Log errors for evaluation
        for label in frame_labels:
            tid = label.track_id
            if tid in predicted_positions:
                evaluator.log(frame_idx, tid, predicted_positions[tid], label.location)

        # Update tracker with all labels in this frame
        for label in frame_labels:
            X, Y, Z = label.location
            real_uv = camera.project_point((X,Y,Z))
            if label.occluded < occlusion_threshold and not is_in_rectangle(real_uv, occlusion_rect):
                tracker.update(label.track_id, label.obj_type, X, Y, Z)
            if not is_in_rectangle(real_uv, bounding_box):
                tracker.remove_object(label.track_id)


        # Draw real (green) and predicted (blue) points
        for obj in tracker.get_objects():
            pred_pos = obj.get_predicted_position()
            pred_uv = camera.project_point(pred_pos)
            if pred_uv is not None:
                cv2.circle(frame, pred_uv, 6, obj.color, -1)
                cv2.putText(frame, str(obj.track_id), pred_uv, font, fontScale, fontColor, thickness, lineType)

        for label in frame_labels:
            real_uv = camera.project_point(label.location)
            if real_uv is not None and tracker.is_tracked(label.track_id, label.obj_type) and not is_in_rectangle(real_uv, occlusion_rect):
                cv2.circle(frame, real_uv, 6, (0,255,0), -1)  # green for real

        cv2.putText(frame, str(frame_idx), (1000,20), font, fontScale, fontColor, thickness, lineType)

        cv2.imshow("Tracking", frame)
        if cv2.waitKey(1) == 27:
            break
        time.sleep(0.05)

    summary = evaluator.get_summary()
    print("Prediction Error Summary:", summary)

    cv2.destroyAllWindows()


if __name__ == "__main__":
    main()


Prediction Error Summary: {'mean': np.float64(0.06621530720224773), 'max': np.float64(1.2477525223339099), 'min': np.float64(5.448131135540673e-07)}


# Kalman filter (with tracking)

In [2]:
import os
import time
import cv2
import numpy as np
from scipy.optimize import linear_sum_assignment

from kalman_filters import KalmanFilter3D_CA, KalmanFilter3D_CV

# =============================
# SINGLE TRACKING LABEL CLASS
# =============================
class TrackingLabel:
    def __init__(self, frame, track_id, obj_type, truncated, occluded, alpha,
                 bbox, dimensions, location, rotation_y, score=None):
        self.frame = int(frame)
        self.track_id = int(track_id)
        self.obj_type = obj_type           # 'Car', 'Pedestrian', 'Cyclist', etc.
        self.truncated = float(truncated)  # 0..1
        self.occluded = int(occluded)      # 0,1,2,3
        self.alpha = float(alpha)          # [-pi, pi]
        
        self.bbox = np.array(bbox, dtype=float)          # [left, top, right, bottom]
        self.dimensions = np.array(dimensions, dtype=float)  # [h, w, l]
        self.location = np.array(location, dtype=float)      # [x, y, z]
        self.rotation_y = float(rotation_y)             # [-pi, pi]
        self.score = float(score) if score is not None else None

    def __repr__(self):
        return f"TrackingLabel(frame={self.frame}, track_id={self.track_id}, type={self.obj_type})"


# =============================
# LABEL FILE LOADER CLASS
# =============================
class TrackingLabelLoader:
    def __init__(self, filename):
        self.filename = filename
        self.labels = []

    def load(self):
        with open(self.filename, "r") as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) < 17:  # ignore incomplete lines
                    continue

                frame      = parts[0]
                track_id   = parts[1]
                obj_type   = parts[2]
                truncated  = parts[3]
                occluded   = parts[4]
                alpha      = parts[5]

                bbox       = [parts[6], parts[7], parts[8], parts[9]]
                dimensions = [parts[10], parts[11], parts[12]]
                location   = [parts[13], parts[14], parts[15]]
                rotation_y = parts[16]
                score      = parts[17] if len(parts) > 17 else None

                label = TrackingLabel(frame, track_id, obj_type, truncated, occluded, alpha,
                                      bbox, dimensions, location, rotation_y, score)
                self.labels.append(label)
        return self.labels

    def get_labels_for_frame(self, frame_id):
        """Return all labels for a specific frame."""
        return [label for label in self.labels if label.frame == frame_id]

    def get_labels_for_track(self, track_id):
        """Return all labels for a specific object track."""
        return [label for label in self.labels if label.track_id == track_id]
    

# =============================
# TRACKED OBJECT CLASS
# =============================
class TrackedObject:
    def __init__(self, track_id, type, color):
        self.track_id = track_id
        self.kf = KalmanFilter3D_CV() if type == "Pedestrian" else KalmanFilter3D_CA()
        self.color = color
        self.last_update = time.time()
    
    def update(self, X, Y, Z):
        Z_meas = np.array([[X],[Y],[Z]])
        self.kf.update(Z_meas)
    
    def predict(self):
        now = time.time()
        dt = now - self.last_update
        self.last_update = now
        self.kf.predict(dt)
    
    def get_predicted_position(self):
        return self.kf.x[0,0], self.kf.x[1,0], self.kf.x[2,0]


# =============================
# MULTI-OBJECT TRACKER CLASS
# =============================
class Tracker:
    def __init__(self, colors=None):
        self.tracked_id = None
        self.tracked_type = None
        self.tracked_type = "Pedestrian"
        # self.tracked_type = "Cyclist"
        # self.tracked_type = "Car"
        self.objects = {}
        self.colors = colors if colors else [(255,0,0)]
        self.highest_track_id = 0
    
    def update(self, tid, type, X, Y, Z):
        # if not self.is_tracked(tid, type):
        #     return
        if tid not in self.objects:
            color = self.colors[len(self.objects) % len(self.colors)]
            self.objects[tid] = TrackedObject(tid, type, color)
            if tid > self.highest_track_id:
                self.highest_track_id = tid
        self.objects[tid].update(X,Y,Z)

    def is_tracked(self, tid, type):
        if self.tracked_id != None and tid == self.tracked_id:
            return True
        if self.tracked_id == None and self.tracked_type != None and type == self.tracked_type:
            return True
        if self.tracked_id == None and self.tracked_type == None:
            return True
        return False
    
    def predict(self):
        for obj in self.objects.values():
            obj.predict()
    
    def get_objects(self):
        return self.objects.values()
    
    def remove_object(self, track_id):
        if track_id in self.objects:
            del self.objects[track_id]
    

# =============================
# CAMERA
# =============================
class Camera:
    def __init__(self, K_rect):
        self.K = K_rect

    def project_point(self, P):
        X, Y, Z = P
        if Z <= 0:
            return None
        u = self.K[0,0] * X / Z + self.K[0,2]
        v = self.K[1,1] * Y / Z + self.K[1,2]
        return int(u), int(v)


# =============================
# OCCLUSION RECTANGLE
# =============================
def is_in_rectangle(uv, rect):
    """Check if a 2D point uv=(u,v) is inside rectangle rect=(x1,y1,x2,y2)."""
    if uv is None:
        return False
    u, v = uv
    x1, y1, x2, y2 = rect
    return x1 <= u <= x2 and y1 <= v <= y2


def associate_detections_to_tracks(detections, tracks, max_distance=1):
    """
    detections: list of (x,y,z)
    tracks: list of predicted (x,y,z) from the Kalman filters

    returns:
        matches: list of (track_idx, detection_idx)
        unmatched_tracks: list of track indices
        unmatched_detections: list of detection indices
    """

    if len(tracks) == 0:
        # No tracks yet → all detections create new tracks
        return [], [], list(range(len(detections)))

    # --- Build cost matrix ---
    cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float32)

    for t, track_pos in enumerate(tracks):
        for d, det_pos in enumerate(detections):
            dist = np.linalg.norm(track_pos - det_pos)
            # print("Track pos: ", track_pos)
            # print("Det pos: ", det_pos)
            # print("Dist: ", dist)
            cost_matrix[t, d] = dist

    # print("Matrix: ", cost_matrix)
    # print("--------------------------")

    # --- Solve assignment ---
    track_idx, det_idx = linear_sum_assignment(cost_matrix)

    matches = []
    unmatched_tracks = list(range(len(tracks)))
    unmatched_detections = list(range(len(detections)))

    # --- Filter assignments by max distance threshold ---
    for t, d in zip(track_idx, det_idx):
        if cost_matrix[t, d] > max_distance:
            continue  # too far → don't match
        
        matches.append((t, d))
        unmatched_tracks.remove(t)
        unmatched_detections.remove(d)

    return matches, unmatched_tracks, unmatched_detections


# =============================
# MAIN FUNCTION
# =============================
def main():
    image_folder = "./seq_02/image_02/data/"
    label_file   = "./seq_02/labels.txt"
    occlusion_threshold = 3
    occlusion_rect = (660, 100, 910, 350)
    bounding_box = (0, 0, 1224, 370)
    outside_pos = np.array([1e9, 1e9, 1e9])

    font                   = cv2.FONT_HERSHEY_SIMPLEX
    fontScale              = 0.7
    fontColor              = (255,255,255)
    thickness              = 3
    lineType               = 2

    # Camera
    camera = Camera(np.array([
        [7.070493e+02, 0.000000e+00, 6.040814e+02, 0.000000e+00],
        [0.000000e+00, 7.070493e+02, 1.805066e+02, 0.000000e+00],
        [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00]
    ]))

    # Load labels
    loader = TrackingLabelLoader(label_file)
    labels = loader.load()

    tracker = Tracker()
    frame_ids = sorted(np.unique([label.frame for label in labels]))

    for frame_idx in frame_ids:
        frame_labels = loader.get_labels_for_frame(frame_idx)
        img_file = os.path.join(image_folder, f"{int(frame_idx):010d}.png")
        frame = cv2.imread(img_file)
        if frame is None:
            print("Missing frame:", img_file)
            continue

        # Tracking detections
        predicted_positions = []
        track_ids = []

        for t in tracker.get_objects():
            pos = t.get_predicted_position()
            predicted_positions.append(pos)
            track_ids.append(t.track_id)

        inside_indices = [
            i for i, pred_pos in enumerate(predicted_positions)
            if is_in_rectangle(camera.project_point(pred_pos), bounding_box)
        ]

        inside_positions = [predicted_positions[i] for i in inside_indices]
        
        detections = [np.array(label.location)
              for label in frame_labels
              if label.obj_type == tracker.tracked_type]

        # Associate detections to predictions
        matches, lost_tracks, new_dets = associate_detections_to_tracks(detections, inside_positions)

        # print("Detections: ", detections)
        # print("--------------------------")
        # print("Preds: ", inside_positions)
        # print("--------------------------")
        # print("Matches: ", matches)
        # print("--------------------------")
        # print("Lost: ", lost_tracks)
        # print("--------------------------")
        # print("New: ", new_dets)
        # print("--------------------------")

        # Update matched tracks
        real_matches = [(track_ids[inside_indices[t_idx]], d_idx) for t_idx, d_idx in matches]
        for track_id, det_idx in real_matches:
            tracker.update(track_id, "Pedestrian", detections[det_idx][0], detections[det_idx][1], detections[det_idx][2])

        # Predict tracking positions
        tracker.predict()

        # Create new track for unmatched detections
        for d_idx in new_dets:
            tracker.update(tracker.highest_track_id + 1, "Pedestrian", detections[d_idx][0], detections[d_idx][1], detections[d_idx][2])

        # print("Tracked: ", tracker.objects)
        # print("=============================")

        for label in frame_labels:
            real_uv = camera.project_point(label.location)
            if real_uv is not None and tracker.is_tracked(label.track_id, label.obj_type) and not is_in_rectangle(real_uv, occlusion_rect):
                cv2.circle(frame, real_uv, 6, (0,255,0), -1)  # green for real

        # Draw real (green) and predicted (blue) points
        for obj in tracker.get_objects():
            pred_pos = obj.get_predicted_position()
            pred_uv = camera.project_point(pred_pos)
            if pred_uv is not None:
                cv2.circle(frame, pred_uv, 6, obj.color, -1)
                cv2.putText(frame, str(obj.track_id), pred_uv, font, fontScale, fontColor, thickness, lineType)

        cv2.putText(frame, str(frame_idx), (1000,20), font, fontScale, fontColor, thickness, lineType)

        cv2.imshow("Tracking", frame)
        if cv2.waitKey(1) == 27:
            break
        time.sleep(0.03)

    cv2.destroyAllWindows()


if __name__ == "__main__":
    main()
