# Imports

In [38]:
import yaml
import re
import numpy as np

# Metrics
from scipy.spatial.distance import directed_hausdorff
from scipy.spatial.distance import euclidean
from scipy.spatial.transform import Rotation

from fastdtw import fastdtw

# Visualization
import plotly.graph_objects as go

# Ros modules
import rosbag2_py
import rclpy.serialization
from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped

# Data generation

## Build the project

In [39]:
# %cd ../..
# ! colcon build

## Run nodes and save the outputs to .txt files

In [40]:
# ! ros2 bag record -o calculated_pose /new_pose
# ! ros2 bag record -o original_pose /clean_pose

# Extract position sequence

## Change working directory for the Notebook to read the recordings

In [41]:
%cd ..
%cd recordings/

/home/pablo-portatil/GITHUB/TFG/project_ws/src
/home/pablo-portatil/GITHUB/TFG/project_ws/src/recordings


## Save the 3D paths

In [42]:
 # Initialize the data that will be published
def read_bag(base_path):
       storage_options = rosbag2_py.StorageOptions(uri=base_path, storage_id="sqlite3")
       converter_options = rosbag2_py.ConverterOptions(input_serialization_format = "cdr", output_serialization_format = "cdr")
       reader = rosbag2_py.SequentialReader()
       reader.open(storage_options, converter_options)
       return reader
        

In [43]:
original_path = []

bag_original = read_bag("original_pose/original_pose_0.db3")

while bag_original.has_next():
    
    # Read the next message in the queue
    topic, msg, t = bag_original.read_next()

    # Deserialize the msg to extract the x, y and z values
    deserialized_msg = rclpy.serialization.deserialize_message(msg, PoseWithCovarianceStamped)

    original_path.append([deserialized_msg.pose.pose.position.x,
                          deserialized_msg.pose.pose.position.y,
                          deserialized_msg.pose.pose.position.z])
    
original_path = np.array(original_path)

[INFO] [1742465709.893251913] [rosbag2_storage]: Opened database 'original_pose/original_pose_0.db3' for READ_ONLY.


In [44]:
calculated_path = []

bag_calculated = read_bag("calculated_pose/calculated_pose_0.db3")

while bag_calculated.has_next():
    
    # Read the next message in the queue
    topic, msg, t = bag_calculated.read_next()

    # Deserialize the msg to extract the x, y and z values
    deserialized_msg = rclpy.serialization.deserialize_message(msg, PoseStamped)

    calculated_path.append([deserialized_msg.pose.position.x,
                            deserialized_msg.pose.position.y,
                            deserialized_msg.pose.position.z])
    
calculated_path = np.array(calculated_path)

[INFO] [1742465710.048263870] [rosbag2_storage]: Opened database 'calculated_pose/calculated_pose_0.db3' for READ_ONLY.


# Metricas y visualización

## Metricas

### Absolut Trayectory Error (ATE)

In [45]:
def align_trajectories(traj_ref, traj_est):

    # Alings the estimate trayectory to the reference using a Sim(3) transformation

    # Centroids calculation
    centroid_ref = np.mean(traj_ref, axis=0)
    centroid_est = np.mean(traj_est, axis=0)
    
    # Center data
    traj_ref_centered = traj_ref - centroid_ref
    traj_est_centered = traj_est - centroid_est

    # Covariance matrix
    H = traj_est_centered.T @ traj_ref_centered

    # SVD decomposition for optimum rotation search
    U, _, Vt = np.linalg.svd(H)
    
    # Rotation matrix
    R = Vt.T @ U.T 

    # Valid rotation matrix check
    if np.linalg.det(R) < 0:
        Vt[-1, :] *= -1
        R = Vt.T @ U.T

    # Apply rotation and translation
    traj_est_aligned = (traj_est_centered @ R.T) + centroid_ref

    return traj_est_aligned

def compute_ate(traj_ref, traj_est):
    # Compute Absolut Trayectory error between two paths
    
    # Aling estimated trayectories
    traj_est_aligned = align_trajectories(traj_ref, traj_est)

    # Compute point to point error
    errors = np.linalg.norm(traj_ref - traj_est_aligned, axis=1)

    # Compute metrics
    ate_mean = np.mean(errors)
    ate_std = np.std(errors)

    return ate_mean, ate_std, errors

In [46]:
ate_mean, ate_std, errors = compute_ate(original_path, calculated_path)

print("ATE Mean: " + str(ate_mean)+ " meters")
print("ATE STD: " + str(ate_std) + " meters")
print(f"Max - Min error: {np.max(errors)} - {np.min(errors)}")

ATE Mean: 0.509965684942263 meters
ATE STD: 0.16456789593630336 meters
Max - Min error: 0.8020175773078724 - 0.08966554296697438


### Hausdorff

In [47]:
d1 = directed_hausdorff(original_path, calculated_path)[0]
d2 = directed_hausdorff(calculated_path, original_path)[0]

hausdorff_distance = max(d1, d2)
print("Hausdorff Distance:", hausdorff_distance)

Hausdorff Distance: 3.0696145786333418


### Frechet-like Distance (DTW)

In [48]:
distance, _ = fastdtw(calculated_path, original_path, dist=euclidean)
print("Frechet-like Distance (DTW):", distance)

Frechet-like Distance (DTW): 1293.6578913755993


## Visualización

In [49]:
# Create 3D scatter lines
fig = go.Figure()

# Adding Original Path to figure
fig.add_trace(go.Scatter3d(
    x=original_path[:,0], y=original_path[:,1], z=original_path[:,2],
    mode='lines+markers', marker=dict(size=4, color='red'),
    name="Original"
))

# Adding Calculated Path to figure
fig.add_trace(go.Scatter3d(
    x=calculated_path[:,0], y=calculated_path[:,1], z=calculated_path[:,2],
    mode='lines+markers', marker=dict(size=4, color='blue'),
    name="Calculated"
))

# Labels & Layout
fig.update_layout(
    title="Interactive 3D Path Visualization",
    scene=dict(
        xaxis_title="X",
        yaxis_title="Y",
        zaxis_title="Z"
    ),
    margin=dict(l=0, r=0, b=0, t=40)
)

# Show the figure
fig.show()