## Kalman Filter 

In [None]:
import pickle
import open3d as o3d
from filterpy.kalman import KalmanFilter
import numpy as np
import math
import random
import json
import yaml
import uuid
from scipy.optimize import linear_sum_assignment  # Hungarian algorithm

#=================== Kalman Filter Tracker for 3D Box ===================
class BoxTracker:
    def __init__(self, bbox_3d):
        bbox_3d = np.array(bbox_3d)
        # Kalman Filter for 3D position and velocity
        self.kf = KalmanFilter(dim_x=6, dim_z=3)
        
        dt = 1.0  # time step
        # State transition model
        self.kf.F = np.array([[1, 0, 0, dt, 0, 0], 
                              [0, 1, 0, 0, dt, 0], 
                              [0, 0, 1, 0, 0, dt], 
                              [0, 0, 0, 1, 0, 0], 
                              [0, 0, 0, 0, 1, 0], 
                              [0, 0, 0, 0, 0, 1]])

        # Measurement model
        self.kf.H = np.array([[1, 0, 0, 0, 0, 0],
                              [0, 1, 0, 0, 0, 0],
                              [0, 0, 1, 0, 0, 0]])

        self.kf.R = np.eye(3) * 0.1  # Measurement noise
        self.kf.Q = np.eye(6) * 0.1  # Process noise

        # Initial state: x, y, z
        self.kf.x[:3, 0] = bbox_3d[:3]
        self.kf.P = np.eye(6) * 10  # Initial uncertainty

    def predict(self):
        self.kf.predict()

    def update(self, bbox_3d):
        self.kf.update(bbox_3d[:3])

    def get_state(self):
        return self.kf.x[:3, 0]

#=================== Main Tracker Class ===================
class bb_traking():
    def __init__(self, config) -> None:
        # Tracker configuration
        self.lidar_frequency = config.get('lidar_frequency', 10)
        self.threshold = config.get('threshold', 5.0)
        self.countdown = config.get('countdown', 1)
        self.velocity_threshold = config.get('velocity_threshold', 1.0)
        self.ignore_velocity = config.get('ignore_velocity', False)
        self.box_colors = config.get('box_colors', [[1,0,0],[0,1,0],[0,0,1]])
        self.enable_visualization = config.get('enable_visualization', False)

        # Thresholds for x/y matching in Hungarian
        self.x_threshold = config.get('x_threshold', 1.0)
        self.y_threshold = config.get('y_threshold', 5.0)

    def create_rotation_matrix(self, yaw):
        # Create 3D rotation matrix from yaw angle
        cos_yaw = np.cos(yaw)
        sin_yaw = np.sin(yaw)
        return np.array([
            [cos_yaw, -sin_yaw, 0],
            [sin_yaw,  cos_yaw, 0],
            [0,        0,       1]
        ])

    def calculate_velocity(self, P1, P2):
        # Estimate velocity using two positions
        t = 1 / self.lidar_frequency
        P1 = np.array(P1[:2])
        P2 = np.array(P2[:2])
        return (P2 - P1) / t

    def generate_tracking_uuid(self, vehicle_db, box, velocity):
        # Create a new tracker entry with a UUID
        new_uuid = str(uuid.uuid4())
        tracker = BoxTracker(box)
        tracker.predict()
        predicted_bbox_center = tracker.get_state().tolist()
        vehicle_db[new_uuid] = [
            *predicted_bbox_center,
            0,                  # mapped_count
            float('inf'),       # best_distance
            box,
            tracker,
            self.countdown,
            velocity,
            True                # new_velocity flag
        ]
        return vehicle_db, new_uuid

    def count_points_in_bbox(self, point_cloud, bbox):
        # Count number of points inside 3D box using Open3D
        cropped_pc = point_cloud.crop(bbox)
        return len(np.asarray(cropped_pc.points))

    def create_bb_box(self, box, uuid_key, point_cloud=None):
        # Build 3D box geometry and result dictionary
        center = np.array(box[:3])
        extent = np.array(box[3:6])
        yaw = box[6]
        rotation = self.create_rotation_matrix(yaw)
        bbox_3d = o3d.geometry.OrientedBoundingBox(center, rotation, extent)
        color_idx = abs(hash(uuid_key)) % len(self.box_colors)
        color = self.box_colors[color_idx]
        bbox_3d.color = color

        res_box = {
            'uuid': uuid_key,
            'label': 'Car',
            'position': {'x': float(center[0]), 'y': float(center[1]), 'z': float(center[2])},
            'dimensions': {'x': float(extent[1]), 'y': float(extent[0]), 'z': float(extent[2])},
            'yaw': float(yaw),
            'stationary': True,
            'camera_used': None,
            'attributes': {'state': 'Parked'},
            'points_count': 0,
            'key': uuid_key,
            'color': color
        }
        if point_cloud is not None:
            res_box['points_count'] = self.count_points_in_bbox(point_cloud, bbox_3d)
        return bbox_3d, res_box

#=================== Main Tracking Method ===================
    def visualize_bounding_boxes(self, frames, point_clouds=None):
        """
        Apply Hungarian algorithm to track boxes across frames.
        Optionally count 3D points inside boxes using point clouds.
        """
        vehicle_db = {}  # uuid: [x, y, z, mapped_count, best_dist, box, tracker, countdown, velocity, new_velocity]
        o3d_boxes = []
        frame_results = []
    
        for f, frame_boxes in enumerate(frames):
            current_point_cloud = point_clouds[f] if point_clouds is not None else None

            for key in vehicle_db:
                vehicle_db[key][3] = 0  # Reset mapped_count
    
            old_keys = list(vehicle_db.keys())
            new_boxes = list(frame_boxes)
            N = len(old_keys)
            M = len(new_boxes)
    
            if N == 0 and M == 0:
                frame_results.append({'cuboids': []})
                continue
            elif N == 0:
                box_results = []
                for box in new_boxes:
                    vehicle_db, new_uuid = self.generate_tracking_uuid(vehicle_db, box, [0, 0])
                    bb, bb_res = self.create_bb_box(box, new_uuid, point_cloud=current_point_cloud)
                    box_results.append(bb_res)
                frame_results.append({'cuboids': box_results})
                continue
            elif M == 0:
                box_results = []
                for k in list(vehicle_db.keys()):
                    vehicle_db[k][7] -= 1
                    if vehicle_db[k][7] <= -1:
                        del vehicle_db[k]
                frame_results.append({'cuboids': box_results})
                continue

            #=================== Cost Matrix for Hungarian ===================
            cost_matrix = np.zeros((N, M), dtype=np.float32)
            for i, key in enumerate(old_keys):
                value = vehicle_db[key]
                old_center = value[:3]
                for j, box in enumerate(new_boxes):
                    center = box[:3]
                    dx = abs(center[0] - old_center[0])
                    dy = abs(center[1] - old_center[1])
                    if dx <= self.x_threshold and dy <= self.y_threshold:
                        cost_matrix[i, j] = math.sqrt(dx*dx + dy*dy)
                    else:
                        cost_matrix[i, j] = 99999.0

            # Hungarian algorithm
            row_ind, col_ind = linear_sum_assignment(cost_matrix)
            matched_old_indices = set()
            matched_new_indices = set()
            box_results = []

            for i, j in zip(row_ind, col_ind):
                cost = cost_matrix[i, j]
                if cost < 99999.0:
                    matched_old_indices.add(i)
                    matched_new_indices.add(j)
                    best_match_key = old_keys[i]
                    box = new_boxes[j]
                    tracker = vehicle_db[best_match_key][6]
                    tracker.update(box)
                    tracker.predict()
                    predicted_bbox_center = tracker.get_state().tolist()
                    old_center = vehicle_db[best_match_key][5][:3]
                    vehicle_db[best_match_key][:3] = predicted_bbox_center
                    vehicle_db[best_match_key][3] += 1
                    vehicle_db[best_match_key][4] = cost
                    vehicle_db[best_match_key][5] = box
                    if not self.ignore_velocity:
                        new_velocity = self.calculate_velocity(old_center, box[:3])
                        vehicle_db[best_match_key][8] = new_velocity
                    bb, bb_res = self.create_bb_box(box, best_match_key, point_cloud=current_point_cloud)
                    o3d_boxes.append(bb)
                    box_results.append(bb_res)

            #=================== Handle Unmatched Old ===================
            unmatched_old = set(range(N)) - matched_old_indices
            for i in unmatched_old:
                key = old_keys[i]
                vehicle_db[key][7] -= 1
                if vehicle_db[key][7] <= -1:
                    del vehicle_db[key]

            #=================== Handle Unmatched New ===================
            unmatched_new = set(range(M)) - matched_new_indices
            for j in unmatched_new:
                box = new_boxes[j]
                vehicle_db, new_uuid = self.generate_tracking_uuid(vehicle_db, box, [0, 0])
                bb, bb_res = self.create_bb_box(box, new_uuid, point_cloud=current_point_cloud)
                o3d_boxes.append(bb)
                box_results.append(bb_res)

            frame_results.append({'cuboids': box_results})

        if self.enable_visualization:
            o3d.visualization.draw_geometries(o3d_boxes)

        return frame_results

#=================== Load Pickle Box List ===================
def load_bounding_boxes(file_path):
    with open(file_path, 'rb') as f:
        box_list = pickle.load(f)
    return box_list

#=================== Convert Numpy to Python Types ===================
def convert_to_python_types(data):
    if isinstance(data, dict):
        return {key: convert_to_python_types(value) for key, value in data.items()}
    elif isinstance(data, list):
        return [convert_to_python_types(item) for item in data]
    elif isinstance(data, np.ndarray):
        return data.tolist()
    elif isinstance(data, (np.float32, np.float64)):
        return float(data)
    elif isinstance(data, (np.int32, np.int64)):
        return int(data)
    else:
        return data

#=================== Main Execution ===================
if __name__ == "__main__":
    # Load 3D boxes from pickle
    box_list = load_bounding_boxes("box_list(kopia).pkl")

    # Load config from YAML or override manually
    config = yaml.safe_load(open("settings.yaml", 'r'))
    config = {
       'x_threshold': 20,
       'y_threshold': 3,
       'lidar_frequency': 10,
       'enable_visualization': False
    }

    # Create tracker instance
    tracker = bb_traking(config)

    # Run tracking (no point cloud used in this example)
    frame_results = tracker.visualize_bounding_boxes(box_list, point_clouds=None)

    # Convert results to JSON-safe format
    converted_results = convert_to_python_types(frame_results)
    output_path = "KF52_ann_final.json"
    with open(output_path, 'w') as json_file:
        json.dump(converted_results, json_file, indent=4)

    print(f"Done! Results saved in {output_path}")


## EKF

In [None]:
import pickle
import open3d as o3d
from filterpy.kalman import KalmanFilter
from filterpy.kalman import ExtendedKalmanFilter

import numpy as np
import math
import random
import json
import yaml
import uuid
from scipy.optimize import linear_sum_assignment  # Hungarian algorithm

#=================== Extended Kalman Filter Tracker ===================
class BoxTracker:
    def __init__(self, bbox_3d):
        # Initialize EKF with state [x, y, z, vx, vy, vz]
        self.ekf = ExtendedKalmanFilter(dim_x=6, dim_z=3)

        # State transition model (assumes constant velocity)
        self.ekf.F = np.eye(6)
        self.ekf.F[0, 3] = 1
        self.ekf.F[1, 4] = 1
        self.ekf.F[2, 5] = 1

        # Measurement function: maps state to observation space
        self.hx = lambda x: np.array([x[0], x[1], x[2]])

        # Jacobian of the measurement function
        self.HJacobian = lambda x: np.array([[1, 0, 0, 0, 0, 0],
                                             [0, 1, 0, 0, 0, 0],
                                             [0, 0, 1, 0, 0, 0]])

        # Initial state: position only, velocity = 0
        self.ekf.x = np.array([bbox_3d[0], bbox_3d[1], bbox_3d[2], 0, 0, 0])

        self.ekf.P *= 1000           # Initial uncertainty
        self.ekf.R = np.eye(3) * 10  # Measurement noise
        self.ekf.Q = np.eye(6) * 0.1 # Process noise

    def update(self, bbox_3d):
        z = np.array([bbox_3d[0], bbox_3d[1], bbox_3d[2]])
        self.ekf.update(z, self.HJacobian, self.hx)

    def predict(self, dt=1.0):
        # Predict next state
        self.ekf.F = self.jacobian_F(self.ekf.x, dt)
        self.ekf.x = self.f(self.ekf.x, dt)
        self.ekf.P = self.ekf.F @ self.ekf.P @ self.ekf.F.T + self.ekf.Q

    def get_state(self):
        return self.ekf.x[:3]

    def calculate_velocity(self, prev_pos, curr_pos, dt=1.0):
        # Compute velocity between two positions
        velocity = (np.array(curr_pos) - np.array(prev_pos)) / dt
        return velocity

    def jacobian_F(self, x, dt):
        # State transition Jacobian
        return np.array([
            [1, 0, 0, dt, 0, 0],
            [0, 1, 0, 0, dt, 0],
            [0, 0, 1, 0, 0, dt],
            [0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 1, 0],
            [0, 0, 0, 0, 0, 1]
        ])

    def f(self, x, dt):
        # State transition function
        F = self.jacobian_F(x, dt)
        return F @ x

#=================== Object Tracker using Hungarian Algorithm ===================
class bb_traking():
    def __init__(self, config) -> None:
        self.lidar_frequency = config.get('lidar_frequency', 10)
        self.threshold = config.get('threshold', 10)
        self.countdown = config.get('countdown', 1)
        self.velocity_threshold = config.get('velocity_threshold', 50)
        self.ignore_velocity = config.get('ignore_velocity', False)
        self.box_colors = config.get('box_colors', [[1,0,0],[0,1,0],[0,0,1]])
        self.enable_visualization = config.get('enable_visualization', False)
        self.x_threshold = config.get('x_threshold', 20)
        self.y_threshold = config.get('y_threshold', 3)

    def create_rotation_matrix(self, yaw):
        # Build rotation matrix from yaw angle
        cos_yaw = np.cos(yaw)
        sin_yaw = np.sin(yaw)
        return np.array([
            [cos_yaw, -sin_yaw, 0],
            [sin_yaw,  cos_yaw, 0],
            [0,        0,       1]
        ])

    def calculate_velocity(self, P1, P2):
        # Velocity = (pos2 - pos1) / time
        t = 1 / self.lidar_frequency
        P1 = np.array(P1[:2])
        P2 = np.array(P2[:2])
        return (P2 - P1) / t

    def generate_tracking_uuid(self, vehicle_db, box, velocity):
        # Create new track with unique ID and init tracker
        new_uuid = str(uuid.uuid4())
        tracker = BoxTracker(box)
        tracker.predict()
        predicted_bbox_center = tracker.get_state().tolist()
        vehicle_db[new_uuid] = [
            *predicted_bbox_center,  # x, y, z
            0,                       # mapped_count
            float('inf'),            # best_distance
            box,                     # last box
            tracker,                 # tracker object
            self.countdown,          # countdown
            velocity,                # velocity
            True                     # velocity updated flag
        ]
        return vehicle_db, new_uuid

    def count_points_in_bbox(self, point_cloud, bbox):
        # Count number of points inside the 3D bounding box
        cropped_pc = point_cloud.crop(bbox)
        return len(np.asarray(cropped_pc.points))

    def create_bb_box(self, box, uuid_key, point_cloud=None):
        center = np.array(box[:3])
        extent = np.array(box[3:6])
        yaw = box[6]
        rotation = self.create_rotation_matrix(yaw)
        bbox_3d = o3d.geometry.OrientedBoundingBox(center, rotation, extent)
        color_idx = abs(hash(uuid_key)) % len(self.box_colors)
        color = self.box_colors[color_idx]
        bbox_3d.color = color

        res_box = {
            'uuid': uuid_key,
            'label': 'Car',
            'position': {'x': float(center[0]), 'y': float(center[1]), 'z': float(center[2])},
            'dimensions': {'x': float(extent[1]), 'y': float(extent[0]), 'z': float(extent[2])},
            'yaw': float(yaw),
            'stationary': True,
            'camera_used': None,
            'attributes': {'state': 'Parked'},
            'points_count': 0,
            'key': uuid_key,
            'color': color
        }
        if point_cloud is not None:
            res_box['points_count'] = self.count_points_in_bbox(point_cloud, bbox_3d)
        return bbox_3d, res_box

#=================== Frame-by-Frame Tracking with Matching ===================
    def visualize_bounding_boxes(self, frames, point_clouds=None):
        """
        Apply Hungarian matching frame by frame.
        Tracks objects using EKF and updates using new detections.
        """
        vehicle_db = {}
        o3d_boxes = []
        frame_results = []

        for f, frame_boxes in enumerate(frames):
            current_point_cloud = point_clouds[f] if point_clouds is not None else None
            for key in vehicle_db:
                vehicle_db[key][3] = 0  # Reset match count

            old_keys = list(vehicle_db.keys())
            new_boxes = list(frame_boxes)
            N, M = len(old_keys), len(new_boxes)

            if N == 0 and M == 0:
                frame_results.append({'cuboids': []})
                continue
            elif N == 0:
                box_results = []
                for box in new_boxes:
                    vehicle_db, new_uuid = self.generate_tracking_uuid(vehicle_db, box, [0, 0])
                    bb, bb_res = self.create_bb_box(box, new_uuid, point_cloud=current_point_cloud)
                    box_results.append(bb_res)
                frame_results.append({'cuboids': box_results})
                continue
            elif M == 0:
                box_results = []
                for k in list(vehicle_db.keys()):
                    vehicle_db[k][7] -= 1
                    if vehicle_db[k][7] <= -1:
                        del vehicle_db[k]
                frame_results.append({'cuboids': box_results})
                continue

            cost_matrix = np.zeros((N, M), dtype=np.float32)
            for i, key in enumerate(old_keys):
                old_center = vehicle_db[key][:3]
                for j, box in enumerate(new_boxes):
                    center = box[:3]
                    dx = abs(center[0] - old_center[0])
                    dy = abs(center[1] - old_center[1])
                    if dx <= self.x_threshold and dy <= self.y_threshold:
                        cost_matrix[i, j] = math.sqrt(dx**2 + dy**2)
                    else:
                        cost_matrix[i, j] = 99999.0

            row_ind, col_ind = linear_sum_assignment(cost_matrix)
            matched_old_indices = set()
            matched_new_indices = set()
            box_results = []

            for i, j in zip(row_ind, col_ind):
                cost = cost_matrix[i, j]
                if cost < 99999.0:
                    matched_old_indices.add(i)
                    matched_new_indices.add(j)
                    best_match_key = old_keys[i]
                    box = new_boxes[j]
                    tracker = vehicle_db[best_match_key][6]
                    tracker.update(box)
                    tracker.predict()
                    predicted_bbox_center = tracker.get_state().tolist()
                    old_center = vehicle_db[best_match_key][5][:3]
                    vehicle_db[best_match_key][:3] = predicted_bbox_center
                    vehicle_db[best_match_key][3] += 1
                    vehicle_db[best_match_key][4] = cost
                    vehicle_db[best_match_key][5] = box
                    if not self.ignore_velocity:
                        new_velocity = self.calculate_velocity(old_center, box[:3])
                        vehicle_db[best_match_key][8] = new_velocity
                    bb, bb_res = self.create_bb_box(box, best_match_key, point_cloud=current_point_cloud)
                    o3d_boxes.append(bb)
                    box_results.append(bb_res)

            # Handle unmatched
            for i in set(range(N)) - matched_old_indices:
                key = old_keys[i]
                vehicle_db[key][7] -= 1
                if vehicle_db[key][7] <= -1:
                    del vehicle_db[key]

            for j in set(range(M)) - matched_new_indices:
                box = new_boxes[j]
                vehicle_db, new_uuid = self.generate_tracking_uuid(vehicle_db, box, [0, 0])
                bb, bb_res = self.create_bb_box(box, new_uuid, point_cloud=current_point_cloud)
                o3d_boxes.append(bb)
                box_results.append(bb_res)

            frame_results.append({'cuboids': box_results})

        if self.enable_visualization:
            o3d.visualization.draw_geometries(o3d_boxes)

        return frame_results

#=================== Load Bounding Boxes from Pickle ===================
def load_bounding_boxes(file_path):
    with open(file_path, 'rb') as f:
        box_list = pickle.load(f)
    return box_list

#=================== Convert Numpy Types to Native Python ===================
def convert_to_python_types(data):
    if isinstance(data, dict):
        return {key: convert_to_python_types(value) for key, value in data.items()}
    elif isinstance(data, list):
        return [convert_to_python_types(item) for item in data]
    elif isinstance(data, np.ndarray):
        return data.tolist()
    elif isinstance(data, (np.float32, np.float64)):
        return float(data)
    elif isinstance(data, (np.int32, np.int64)):
        return int(data)
    return data

#=================== Main Execution Block ===================
if __name__ == "__main__":
    box_list = load_bounding_boxes("box_list(kopia).pkl")

    config = yaml.safe_load(open("settings.yaml", 'r'))
    config = {
       'x_threshold': 20,
       'y_threshold': 3,
       'lidar_frequency': 10,
       'enable_visualization': False
    }

    tracker = bb_traking(config)

    # Run without point clouds
    frame_results = tracker.visualize_bounding_boxes(box_list, point_clouds=None)

    converted_results = convert_to_python_types(frame_results)
    output_path = "EKF52_ann_final.json"
    with open(output_path, 'w') as json_file:
        json.dump(converted_results, json_file, indent=4)

    print(f"Done! Results saved in {output_path}")
