In [1]:
import pandas as pd
import numpy as np
import os
import traceback

# Define the file path
labels_file_path = r'C:\Users\User\Documents\GitHub\pfas_finalproject\kalman_3d\depth_fixed_occlusion_02.txt'

# Read the labels file with the updated structure
headers = [
    "frame", "track id", "type",
    "bbox_left", "bbox_top", "bbox_right", "bbox_bottom",
    "height_3d", "width_3d", "length_3d",
    "location_x", "location_y", "location_z",
    "rotation_y"
]
df = pd.read_csv(labels_file_path, sep=' ', names=headers)

# Define constants
dt = 0.1036  # Time difference between frames (adjust based on your data)
max_distance = 5.0  # Maximum allowed distance for matching (adjust as needed)
max_missing_frames = 10  # Maximum allowed frames to keep predicting without updates

# Initialize variables
missing_ids = []
tracked_ids = []
new_ids = []
previous_frame_ids = []
frame_counter = 1
reassociation_map = {}
kalman_filters = {}
missing_counts = {}
object_types_dict = {}    # Stores object types for each track_id

# For storing tracking data over time
tracking_data = []

# Get sorted list of frames
unique_frames = sorted(df["frame"].unique())

# Kalman Filter Class Definition with Acceleration in All Dimensions
class KalmanFilter3D:
    def __init__(self, initial_state, dt):
        self.state_dim = 9  # x, y, z, vx, vy, vz, ax, ay, az
        self.meas_dim = 3   # x, y, z
        self.dt = dt

        # Initial state vector
        self.x = np.zeros((self.state_dim, 1))
        self.x[0:3, 0] = initial_state[0:3]  # Initialize position (x, y, z)

        # State Transition Matrix
        dt = self.dt
        self.F = np.array([
            [1, 0, 0, dt, 0,  0,  0.5 * dt**2, 0,           0],
            [0, 1, 0, 0,  dt, 0,  0,           0.5 * dt**2, 0],
            [0, 0, 1, 0,  0,  dt, 0,           0,           0.5 * dt**2],
            [0, 0, 0, 1,  0,  0,  dt,          0,           0],
            [0, 0, 0, 0,  1,  0,  0,           dt,          0],
            [0, 0, 0, 0,  0,  1,  0,           0,           dt],
            [0, 0, 0, 0,  0,  0,  1,           0,           0],
            [0, 0, 0, 0,  0,  0,  0,           1,           0],
            [0, 0, 0, 0,  0,  0,  0,           0,           1],
        ])

        # Observation Matrix
        self.H = np.zeros((self.meas_dim, self.state_dim))
        self.H[0, 0] = 1  # x position
        self.H[1, 1] = 1  # y position
        self.H[2, 2] = 1  # z position

        # Process Noise Covariance
        q = 1.0  # Process noise magnitude (tune as needed)
        dt4 = (dt ** 4) / 4
        dt3 = (dt ** 3) / 2
        dt2 = dt ** 2
        q_block = np.array([
            [dt4,    0,    0, dt3,    0,    0, dt2/2,    0,    0],
            [0,    dt4,    0,    0, dt3,    0,    0, dt2/2,    0],
            [0,       0, dt4,    0,    0, dt3,    0,    0, dt2/2],
            [dt3,    0,    0, dt2,    0,    0, dt,       0,    0],
            [0,    dt3,    0,    0, dt2,    0,    0,     dt,    0],
            [0,       0, dt3,    0,    0, dt2,    0,    0,    dt],
            [dt2/2,  0,    0, dt,     0,    0, 1,        0,    0],
            [0,  dt2/2,    0,    0, dt,     0,    0,    1,     0],
            [0,       0, dt2/2, 0,    0,  dt,    0,     0,     1],
        ])
        self.Q = q * q_block

        # Measurement Noise Covariance
        r = 1.0  # Measurement noise variance for position (tune as needed)
        self.R = np.eye(self.meas_dim) * r

        # Initial covariance matrix
        self.P = np.eye(self.state_dim) * 1000

        # Flag to indicate if the filter was updated with a measurement
        self.updated_with_measurement = False

    def predict(self):
        self.x = np.dot(self.F, self.x)
        self.P = np.dot(self.F, np.dot(self.P, self.F.T)) + self.Q
        self.updated_with_measurement = False

    def update(self, measurement):
        z = np.array(measurement).reshape(-1, 1)  # measurement includes x, y, z
        # Measurement residual
        y = z - np.dot(self.H, self.x)
        # Residual covariance
        S = np.dot(self.H, np.dot(self.P, self.H.T)) + self.R
        # Kalman gain
        K = np.dot(self.P, np.dot(self.H.T, np.linalg.inv(S)))
        # Update state estimate
        self.x = self.x + np.dot(K, y)
        # Update covariance estimate
        self.P = self.P - np.dot(K, np.dot(self.H, self.P))
        self.updated_with_measurement = True

    def get_current_state(self):
        return self.x

    def get_predicted_location(self):
        return self.x[:3, 0]  # Return x, y, z positions

def get_frame_data(frame_id):
    frame_data = df[df["frame"] == frame_id]
    frame_ids = frame_data["track id"].unique().tolist()
    frame_positions = {}
    frame_object_types = {}
    for _, row in frame_data.iterrows():
        track_id = row["track id"]
        x = row["location_x"]
        y = row["location_y"]
        z = row["location_z"]
        position = [x, y, z]
        object_type = row["type"]
        frame_positions[track_id] = position
        frame_object_types[track_id] = object_type
    return frame_ids, frame_positions, frame_object_types

def track_states(current_frame_ids, previous_frame_ids):
    missing_ids = [id_ for id_ in previous_frame_ids if id_ not in current_frame_ids]
    tracked_ids = [id_ for id_ in current_frame_ids if id_ in previous_frame_ids]
    new_ids = [id_ for id_ in current_frame_ids if id_ not in previous_frame_ids]
    return missing_ids, tracked_ids, new_ids

def compute_distance_3d(pos1, pos2):
    # Compute Euclidean distance between two 3D points
    return np.linalg.norm(np.array(pos1) - np.array(pos2))

def match_new_ids_to_missing_predictions(new_ids, missing_ids_prediction, current_frame_positions, max_distance):
    matched_ids = []
    reassociation_map = {}
    unmatched_new_ids = []

    for new_id in new_ids:
        min_distance = float('inf')
        best_match_id = None
        new_pos = current_frame_positions[new_id]

        for missing_id, predicted_state in missing_ids_prediction.items():
            predicted_pos = predicted_state[:3, 0]
            distance = compute_distance_3d(new_pos, predicted_pos)
            if distance < min_distance:
                min_distance = distance
                best_match_id = missing_id

        if min_distance < max_distance:
            matched_ids.append((best_match_id, new_id))
            reassociation_map[new_id] = best_match_id
            # Remove matched missing_id from missing_ids_prediction
            missing_ids_prediction.pop(best_match_id)
        else:
            unmatched_new_ids.append(new_id)

    return matched_ids, reassociation_map, unmatched_new_ids

# Main processing loop
for frame in unique_frames:
    try:
        print(f"Processing frame {frame}")

        # Step 1: Retrieve Frame Data
        current_frame_ids, current_frame_positions, current_frame_object_types = get_frame_data(frame)
        print(f"Current frame IDs: {current_frame_ids}")

        # Update object_types_dict with current frame's object types
        object_types_dict.update(current_frame_object_types)

        # Step 2: Determine States (track_states)
        missing_ids, tracked_ids, new_ids = track_states(current_frame_ids, previous_frame_ids)
        print(f"Missing IDs: {missing_ids}")
        print(f"Tracked IDs: {tracked_ids}")
        print(f"New IDs: {new_ids}")

        # Update missing_counts for missing_ids
        for id_ in missing_ids:
            missing_counts[id_] = missing_counts.get(id_, 0) + 1

        # Step 3: Process missing_ids
        missing_ids_prediction = {}
        for missing_id in missing_ids.copy():
            if missing_id in kalman_filters:
                # Predict the position
                kalman_filter = kalman_filters[missing_id]
                kalman_filter.predict()
                predicted_state = kalman_filter.get_current_state()
                missing_ids_prediction[missing_id] = predicted_state  # Store predicted state
                # Store tracking data
                tracking_data.append({
                    'Frame': frame,
                    'TrackID': missing_id,
                    'X': predicted_state[0, 0],
                    'Y': predicted_state[1, 0],
                    'Z': predicted_state[2, 0],
                    'Type': object_types_dict.get(missing_id, 'misc'),
                    'Status': 'Predicted'  # Prediction only
                })
            else:
                print(f"Kalman filter not found for missing ID {missing_id}")
                missing_ids.remove(missing_id)

        # Step 4: Match new_ids to missing_ids_prediction and Initialize Kalman Filters for Unmatched new_ids
        matched_ids, reassociation_map, unmatched_new_ids = match_new_ids_to_missing_predictions(
            new_ids, missing_ids_prediction.copy(), current_frame_positions, max_distance
        )
        print(f"Matched IDs: {matched_ids}")
        print(f"Unmatched New IDs: {unmatched_new_ids}")

        # Initialize Kalman filters for unmatched new_ids
        for new_id in unmatched_new_ids:
            position = current_frame_positions[new_id]
            measurement = position
            kalman_filter = KalmanFilter3D(initial_state=measurement, dt=dt)
            kalman_filters[new_id] = kalman_filter
            if new_id not in tracked_ids:
                tracked_ids.append(new_id)
            # Store tracking data
            tracking_data.append({
                'Frame': frame,
                'TrackID': new_id,
                'X': measurement[0],
                'Y': measurement[1],
                'Z': measurement[2],
                'Type': object_types_dict.get(new_id, 'misc'),
                'Status': 'Initialized'  # Newly initialized
            })
            # Update Kalman filter with measurement
            kalman_filter.update(measurement)

        # Remove matched missing_ids from missing_ids and missing_counts
        for missing_id, new_id in matched_ids:
            if missing_id in missing_ids:
                missing_ids.remove(missing_id)
            if missing_id in missing_counts:
                del missing_counts[missing_id]
            # Update Kalman filter mapping if necessary
            if missing_id in kalman_filters:
                pass
            else:
                kalman_filters[missing_id] = kalman_filters.pop(new_id)
            # Update object_types_dict
            object_types_dict[missing_id] = object_types_dict.get(new_id, "misc")
            if new_id in object_types_dict:
                del object_types_dict[new_id]
            # Update Kalman filter with new measurement
            kalman_filter = kalman_filters[missing_id]
            kalman_filter.predict()
            measurement = current_frame_positions[new_id]
            kalman_filter.update(measurement)
            # Store tracking data
            tracking_data.append({
                'Frame': frame,
                'TrackID': missing_id,
                'X': kalman_filter.x[0, 0],
                'Y': kalman_filter.x[1, 0],
                'Z': kalman_filter.x[2, 0],
                'Type': object_types_dict.get(missing_id, 'misc'),
                'Status': 'Updated'  # Measurement update
            })

        # Step 5: Process tracked_ids
        matched_missing_ids = [m[0] for m in matched_ids]
        for tracked_id in tracked_ids.copy():
            if tracked_id in matched_missing_ids:
                continue  # Already updated in Step 4

            if tracked_id not in kalman_filters:
                position = current_frame_positions.get(tracked_id, [0, 0, 0])
                measurement = position
                kalman_filter = KalmanFilter3D(initial_state=measurement, dt=dt)
                kalman_filters[tracked_id] = kalman_filter
                print(f"Initialized Kalman filter for tracked ID {tracked_id}")
                # Update Kalman filter with measurement
                kalman_filter.update(measurement)
                # Store tracking data
                tracking_data.append({
                    'Frame': frame,
                    'TrackID': tracked_id,
                    'X': measurement[0],
                    'Y': measurement[1],
                    'Z': measurement[2],
                    'Type': object_types_dict.get(tracked_id, 'misc'),
                    'Status': 'Initialized'  # Newly initialized
                })
                continue

            kalman_filter = kalman_filters[tracked_id]
            # Predict step
            kalman_filter.predict()

            if tracked_id in current_frame_positions:
                # Update step with measurement
                measurement = current_frame_positions[tracked_id]
                kalman_filter.update(measurement)
                # Store tracking data
                tracking_data.append({
                    'Frame': frame,
                    'TrackID': tracked_id,
                    'X': kalman_filter.x[0, 0],
                    'Y': kalman_filter.x[1, 0],
                    'Z': kalman_filter.x[2, 0],
                    'Type': object_types_dict.get(tracked_id, 'misc'),
                    'Status': 'Updated'  # Measurement update
                })
            else:
                # Handle predictions for tracked IDs not in current frame
                predicted_state = kalman_filter.get_current_state()
                tracking_data.append({
                    'Frame': frame,
                    'TrackID': tracked_id,
                    'X': predicted_state[0, 0],
                    'Y': predicted_state[1, 0],
                    'Z': predicted_state[2, 0],
                    'Type': object_types_dict.get(tracked_id, 'misc'),
                    'Status': 'Predicted'  # Prediction only
                })

        # Step 6: Update Tracking States for Next Frame
        previous_frame_ids = tracked_ids + missing_ids

        # Remove stale missing_ids
        for missing_id in missing_ids.copy():
            if missing_counts[missing_id] > max_missing_frames:
                # Remove object from tracking
                print(f"Removed Missing ID {missing_id} after exceeding max missing frames")
                missing_ids.remove(missing_id)
                del kalman_filters[missing_id]
                del missing_counts[missing_id]
                object_types_dict.pop(missing_id, None)

        # Increment frame_counter
        frame_counter += 1

    except Exception as e:
        print(f"Exception occurred at frame {frame}: {e}")
        traceback.print_exc()
        break

# After processing all frames, create a DataFrame from tracking_data
tracking_df = pd.DataFrame(tracking_data)


Processing frame 0
Current frame IDs: [0, 1, 2, 4, 7, 23]
Missing IDs: []
Tracked IDs: []
New IDs: [0, 1, 2, 4, 7, 23]
Matched IDs: []
Unmatched New IDs: [0, 1, 2, 4, 7, 23]
Processing frame 1
Current frame IDs: [0, 1, 2, 4, 7, 23]
Missing IDs: []
Tracked IDs: [0, 1, 2, 4, 7, 23]
New IDs: []
Matched IDs: []
Unmatched New IDs: []
Processing frame 2
Current frame IDs: [0, 1, 2, 4, 7, 23]
Missing IDs: []
Tracked IDs: [0, 1, 2, 4, 7, 23]
New IDs: []
Matched IDs: []
Unmatched New IDs: []
Processing frame 3
Current frame IDs: [0, 1, 2, 7, 23]
Missing IDs: [4]
Tracked IDs: [0, 1, 2, 7, 23]
New IDs: []
Matched IDs: []
Unmatched New IDs: []
Processing frame 4
Current frame IDs: [0, 1, 2, 7, 23]
Missing IDs: [4]
Tracked IDs: [0, 1, 2, 7, 23]
New IDs: []
Matched IDs: []
Unmatched New IDs: []
Processing frame 5
Current frame IDs: [0, 1, 2, 7, 23]
Missing IDs: [4]
Tracked IDs: [0, 1, 2, 7, 23]
New IDs: []
Matched IDs: []
Unmatched New IDs: []
Processing frame 6
Current frame IDs: [0, 1, 2, 7, 23]
M

In [13]:
def plot_trajectories_in_3d_interactive(tracking_df, track_ids=None):
    """
    Plots interactive 3D trajectories with a single line per track ID,
    changing color based on the status ('Updated', 'Predicted', 'Initialized') dynamically.
    Y is treated as the vertical axis.
    
    Parameters:
    tracking_df (DataFrame): DataFrame containing tracking data with columns ['Frame', 'TrackID', 'X', 'Y', 'Z', 'Type', 'Status'].
    track_ids (list): List of track IDs to plot. If None, plot all track IDs.
    
    Returns:
    None
    """
    import plotly.graph_objs as go
    import numpy as np

    df = tracking_df.copy()

    # Filter track IDs if specified
    if track_ids is not None:
        df = df[df['TrackID'].isin(track_ids)]

    # Get unique track IDs
    unique_track_ids = df['TrackID'].unique()

    # Define a color map for statuses
    status_color_map = {
        'Updated': 'blue',        # Color for 'Updated'
        'Predicted': 'orange',    # Color for 'Predicted'
        'Initialized': 'orange'    # Color for 'Initialized'
    }
    default_color = 'gray'  # Default color for unknown statuses

    # Prepare the data for Plotly
    data_traces = []
    for track_id in unique_track_ids:
        track_df = df[df['TrackID'] == track_id].sort_values('Frame')
        x_coords = track_df['X']
        y_coords = track_df['Y']
        z_coords = track_df['Z']
        statuses = track_df['Status']

        # Generate color array based on status, defaulting to gray for unknown statuses
        colors = [status_color_map.get(status, default_color) for status in statuses]

        # Create the line trace with varying colors
        trace_line = go.Scatter3d(
            x=x_coords,
            y=z_coords,  # Swap axes if needed
            z=y_coords,
            mode='lines',
            name=f'Track ID {track_id}',
            line=dict(width=4, color=colors),
            hoverinfo='text',
            text=[f'Track ID: {track_id}<br>Frame: {frame}<br>Status: {status}' 
                  for frame, status in zip(track_df['Frame'], statuses)]
        )

        # Mark the starting point
        trace_start = go.Scatter3d(
            x=[x_coords.iloc[0]],
            y=[z_coords.iloc[0]],
            z=[y_coords.iloc[0]],
            mode='markers+text',
            marker=dict(color='green', size=8, symbol='circle'),
            name=f'Start of {track_id}',
            hoverinfo='text',
            text=[f"Start<br>Track ID: {track_id}<br>Frame: {track_df['Frame'].iloc[0]}"],
            textposition='top center',
            showlegend=False
        )

        # Mark the ending point
        trace_end = go.Scatter3d(
            x=[x_coords.iloc[-1]],
            y=[z_coords.iloc[-1]],
            z=[y_coords.iloc[-1]],
            mode='markers+text',
            marker=dict(color='red', size=8, symbol='x'),
            name=f'End of {track_id}',
            hoverinfo='text',
            text=[f"End<br>Track ID: {track_id}<br>Frame: {track_df['Frame'].iloc[-1]}"],
            textposition='top center',
            showlegend=False
        )

        # Add the traces to the data list
        data_traces.extend([trace_line, trace_start, trace_end])

    # Define the layout of the plot
    layout = go.Layout(
        title='Interactive 3D Trajectories with Dynamic Status Coloring',
        scene=dict(
            xaxis=dict(title='X (meters)'),
            yaxis=dict(title='Z (meters)'),
            zaxis=dict(title='Y (meters)'),
            aspectmode='data'
        ),
        legend=dict(
            x=0.8,
            y=0.9,
            bgcolor='rgba(255,255,255,0.5)',
            bordercolor='black'
        ),
        margin=dict(l=0, r=0, b=0, t=30)
    )

    # Create the figure and display it
    fig = go.Figure(data=data_traces, layout=layout)
    fig.update_layout(scene_camera=dict(eye=dict(x=1.2, y=1.2, z=0.8)))
    fig.show()


In [16]:
# Call the plotting function
plot_trajectories_in_3d_interactive(tracking_df, track_ids=[7])