# Exploration of 1.5 Million Trajectories in Robotic Path Planning

This notebook explores the distribution of the 1.5 million trajectories contained in the dataset by dividing the robotic workspace into 27 equal cubes and clustering the trajectories according to their starting and target cubes.

## Finding Paths Through Specific Constellations

In this section, we aim to find paths from different datasets that traverse through specific constellations.

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import sys

root_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
if root_dir not in sys.path:
    sys.path.insert(0, root_dir)

In [3]:
import pickle

import numpy as np
from rokin import robots, vis
from justin_arm.helper import interpolate_trajectories
from justin_arm.visualize import plot_multiple_trajectories
from sklearn.cluster import KMeans
import pandas as pd

# Now plot me the start coords:
import matplotlib

import matplotlib.pyplot as plt

from mpl_toolkits.mplot3d import Axes3D

# Ensure we are using an interactive backend

In [4]:
# Full dataset:
paths_4123 = np.load(
    "/Users/magic-rabbit/Documents/ADLR/tum-adlr-ss24-18/justin_arm/data/q_paths_train_dataset.npy"
)
print(f"Dataset Size: {paths_4123.shape}")
image_4123 = np.load(
    "/Users/magic-rabbit/Documents/ADLR/tum-adlr-ss24-18/justin_arm/data/image_2115.npy"
)
# Load the robot
robot = robots.JustinArm07()
# # alternative: three_pv - pyvista; mc meshcat

Dataset Size: (1566029, 20, 7)


In [8]:
# Interpolate and visualize:
# interpol_path = interpolate_trajectories(paths_4123, 32)
# print(interpol_path.shape)

In [5]:
# From the Q-Values get the corresponding frames
frames = robot.get_frames(paths_4123)
print(frames.shape)
# Extract the coordinates directly from the frames array
start_frames = frames[:, 0, 7, :, :]
end_frames = frames[:, -1, 7, :, :]

# Extract the X, Y, Z coordinates
start_coords = start_frames[:, :3, 3]
end_coords = end_frames[:, :3, 3]


print(f"Shape of the start and end coords: {start_coords.shape}, {end_coords.shape}")

(1566029, 20, 8, 4, 4)
Shape of the start and end coords: (1566029, 3), (1566029, 3)


In [6]:
# Initialize the trajectories list
trajectories = []
# Define the boundaries for 3D
x_boundaries = [-1.0, -0.33333333333333337, 0.33333333333333326, 1.0]
y_boundaries = [-1.0, -0.33333333333333337, 0.33333333333333326, 1.0]
z_boundaries = [-1.0, -0.33333333333333337, 0.33333333333333326, 1.0]
# Transform the lists into the desired structure
for start, end in zip(start_coords, end_coords):
    trajectories.append({"start": start, "end": end})

# Define regions
regions = {}
index = 1
for i in range(len(x_boundaries) - 1):
    for j in range(len(y_boundaries) - 1):
        for k in range(len(z_boundaries) - 1):
            region_name = f"R_{i+1}{j+1}{k+1}"
            regions[region_name] = {
                "x": (x_boundaries[i], x_boundaries[i + 1]),
                "y": (y_boundaries[j], y_boundaries[j + 1]),
                "z": (z_boundaries[k], z_boundaries[k + 1]),
            }
            index += 1


# Function to determine the region of a point
def find_region(point):
    for region, bounds in regions.items():
        # print(f"Bounds: {bounds}")
        # print(f"Point:{point}")
        if (
            bounds["x"][0] <= point[0] < bounds["x"][1]
            and bounds["y"][0] <= point[1] < bounds["y"][1]
            and bounds["z"][0] <= point[2] < bounds["z"][1]
        ):
            return region

    print(f"No region found for point: {point}")
    print(f"Bound: {bounds}")
    return None


# Cluster trajectories
clusters = {
    region: {other_region: {"count": 0, "indices": []} for other_region in regions}
    for region in regions
}


for idx, trajectory in enumerate(trajectories):
    start_region = find_region(trajectory["start"])
    end_region = find_region(trajectory["end"])
    if start_region and end_region:
        clusters[start_region][end_region]["count"] += 1
        clusters[start_region][end_region]["indices"].append(idx)

    else:
        print("No matching region found")

cluster_counts = {
    region: {
        other_region: clusters[region][other_region]["count"]
        for other_region in regions
    }
    for region in regions
}
clusters_df = pd.DataFrame(cluster_counts).T

In [7]:
# Show heatmap:
import seaborn as sns
import matplotlib.pyplot as plt

# Heatmap
plt.figure(figsize=(12, 8))
sns.heatmap(clusters_df, annot=True, fmt="g", cmap="viridis")
plt.title("Trajectory Distribution Heatmap")
plt.show()


# dESCRIPTIVE STATISTICS:
desc_stats = df.describe()
print(desc_stats)

NameError: name 'df' is not defined

In [None]:
# Sum all entries in the dataframe
total = clusters_df.sum().sum()
print(total)

# Example for the indices of Cluster R_111 to R_111

In [None]:
%matplotlib inline

from justin_arm.visualize import plot_q_values_per_trajectory

indices = clusters["R_222"]["R_111"]["indices"]
print((len(indices)))
frames_indices = frames[indices]
# Filter me q_paths where where isin the indices of the cluster
q_paths_r112_r212 = paths_4123[indices]
start_coords_r112_r212 = [start_coords[idx] for idx in indices]
end_coords_r112_r212 = [end_coords[idx] for idx in indices]


# # Visualize the first cluster
plot_multiple_trajectories(q_paths_r112_r212[:10])
plot_q_values_per_trajectory(q_paths_r112_r212[:10])

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

fig = plt.figure()
ax = fig.add_subplot(111, projection="3d")
for i in range(len(start_coords_r112_r212[:100])):
    ax.scatter(
        start_coords_r112_r212[i][0],
        start_coords_r112_r212[i][1],
        start_coords_r112_r212[i][2],
        c="r",
    )
    ax.scatter(
        end_coords_r112_r212[i][0],
        end_coords_r112_r212[i][1],
        end_coords_r112_r212[i][2],
        c="b",
    )

plt.show()

In [None]:
start_coords = np.array(start_coords)
end_coords = np.array(end_coords)

print(start_coords.shape)
print(end_coords.shape)

# Do some statistics min max on this data:
print("Start coords:")
print(np.min(start_coords, axis=0))
print(np.max(start_coords, axis=0))
print("End coords:")
print(np.min(end_coords, axis=0))
print(np.max(end_coords, axis=0))

In [None]:
# fig = plt.figure()
# ax = fig.add_subplot(111, projection="3d")
# ax.scatter(start_coords[:, 0], start_coords[:, 1], start_coords[:, 2], color="r")
# ax.scatter(end_coords[:, 0], end_coords[:, 1], end_coords[:, 2], color="b")
# plt.show()

In [None]:
# Combine start and end coordinates into a single feature vector for each trajectory
trajectory_features = [start + end for start, end in zip(start_coords, end_coords)]
trajectory_features = np.array(trajectory_features)

# Define the number of clusters
num_clusters = 8

# Apply k-means clustering
kmeans = KMeans(n_clusters=num_clusters, random_state=0).fit(trajectory_features)

# Get cluster labels
labels = kmeans.labels_

In [None]:
# Cound the distribution of labels
unique, counts = np.unique(labels, return_counts=True)
print(dict(zip(unique, counts)))

# Store the indices of each cluster ina dictionary
cluster_indices = {}
for i in range(num_clusters):
    cluster_indices[i] = np.where(labels == i)[0]

In [None]:
# RETRIEVE THE PATHS OF THE FIRST CLUSTER
frames_0 = [cluster_indices[1]]


# Visualize the first cluster
plot_multiple_trajectories(frames_0, 10)

In [None]:
%matplotlib inline 

# Given the labels for each trajectory, we can now plot the trajectories in different colors
fig = plt.figure()
ax = fig.add_subplot(111, projection="3d")
colors = ["r", "g", "b", "y", "c", "m", "k", "w"]
for i in range(num_clusters):
    ax.scatter(
        start_coords[labels == i, 0],
        start_coords[labels == i, 1],
        start_coords[labels == i, 2],
        color=colors[i],
    )

plt.show()
plt.savefig("start_coords.png")

In [None]:
# # Visualize:
# vis.three_mc.animate_path(
#     robot=robot,
#     q=paths_4123[10],
#     kwargs_robot=dict(color="red", alpha=0.2),
#     kwargs_world=dict(img=image_4123[0], limits=limits, color="yellow"),
# )
# input()

# # vis.three_pv.animate_path(robot=robot, q=q_paths[0],
# #                           kwargs_world=dict(img=obstacle_images[2], limits=limits))

# # move through animation with arrow keys

In [None]:
from torch import rand
from justin_arm.visualize import plot_q_values_per_trajectory

constellations = [
    # ("R_111", "R_333"),
    ("R_113", "R_331"),
    # ("R_131", "R_313"),
    # ("R_133", "R_311"),
]
for start_key, end_key in constellations:
    indices = clusters[start_key][end_key]["indices"]
    if len(indices) != 0:
        print(f"Start: {start_key}, End: {end_key}")
        print(f"Amoutn of trajectories: {len(indices)}")
        frames_indices = frames[indices]
        # Filter me q_paths where where isin the indices of the cluster
        q_paths = paths_4123[indices]
        # start_coords_ = [start_coords[idx] for idx in indices]
        # end_coords = [end_coords[idx] for idx in indices]
        # # Visualize the first cluster
        if len(indices) > 20:
            random_indices = np.random.choice(indices, size=20, replace=False)
        else:
            random_indices = indices
            # Extract the corresponding paths
        plot_multiple_trajectories(paths_4123[random_indices])
        plot_q_values_per_trajectory(paths_4123[random_indices])

In [None]:
import numpy as np


paths_2115 = np.load(
    "/Users/magic-rabbit/Documents/ADLR/tum-adlr-ss24-18/justin_arm//data/q_paths_2115.npy"
)

start_q = paths_2115[680][0]
end_q = paths_2115[680][-1]


def find_closest_paths(q_paths, start_q, end_q):
    # Calculate the Euclidean distance between each starting and ending vector in q_paths
    start_distances = np.linalg.norm(q_paths[:, 0, :] - start_q, axis=1)
    end_distances = np.linalg.norm(q_paths[:, -1, :] - end_q, axis=1)

    # Combine the start and end distances into a single distance metric
    distances = start_distances + end_distances

    # Sort the distances and get the indices of the 8 closest paths
    closest_indices = np.argsort(distances)[:8]

    # How to get highest 8:
    furthest_indices = np.argsort(distances)[-8:]

    # Return the 8 closest paths
    closest_paths = q_paths[closest_indices]
    furthest_paths = q_paths[furthest_indices]

    return closest_paths, furthest_paths


closest_paths, furthest_paths = find_closest_paths(paths_2115, start_q, end_q)
print(closest_paths.shape)


plot_q_values_per_trajectory(paths_2115[680])

plot_multiple_trajectories(closest_paths)
# plot_q_values_per_trajectory(closest_paths)

# plot_multiple_trajectories(furthest_paths)
# plot_q_values_per_trajectory(furthest_paths)

In [None]:
# Now I want to find one Path from different datasets which would be going thorugh constellations:
from justin_arm.visualize import (
    plot_multiple_trajectories,
    plot_q_values_per_trajectory,
)

paths_2115 = np.load(
    "/Users/magic-rabbit/Documents/ADLR/tum-adlr-ss24-18/justin_arm//data/q_paths_2115.npy"
)
paths_2115 = np.load(
    "/Users/magic-rabbit/Documents/ADLR/tum-adlr-ss24-18/justin_arm/data/image_4123.npy"
)

robot = robots.JustinArm07()

frames = robot.get_frames(paths_4123)
# Extract the coordinates directly from the frames array
start_frames = frames[:, 0, 7, :, :]
end_frames = frames[:, -1, 7, :, :]

# Extract the X, Y, Z coordinates
start_coords = start_frames[:, :3, 3]
end_coords = end_frames[:, :3, 3]

# Convert them into a list of lists
print(len(start_coords))
print(len(end_coords))
print(f"Q_Paths shape: {paths_4123.shape}")


# Initialize the trajectories list
trajectories = []
# Transform the lists into the desired structure
for start, end in zip(start_coords, end_coords):
    trajectories.append({"start": start, "end": end})

# Define the boundaries for 3D
x_boundaries = [-1.0, -0.33333333333333337, 0.33333333333333326, 1.0]
y_boundaries = [-1.0, -0.33333333333333337, 0.33333333333333326, 1.0]
z_boundaries = [-1.0, -0.33333333333333337, 0.33333333333333326, 1.0]
# Define regions
regions = {}
index = 1
for i in range(len(x_boundaries) - 1):
    for j in range(len(y_boundaries) - 1):
        for k in range(len(z_boundaries) - 1):
            region_name = f"R_{i+1}{j+1}{k+1}"
            regions[region_name] = {
                "x": (x_boundaries[i], x_boundaries[i + 1]),
                "y": (y_boundaries[j], y_boundaries[j + 1]),
                "z": (z_boundaries[k], z_boundaries[k + 1]),
            }
            index += 1


# Function to determine the region of a point
def find_region(point):
    for region, bounds in regions.items():
        # print(f"Bounds: {bounds}")
        # print(f"Point:{point}")
        if (
            bounds["x"][0] <= point[0] < bounds["x"][1]
            and bounds["y"][0] <= point[1] < bounds["y"][1]
            and bounds["z"][0] <= point[2] < bounds["z"][1]
        ):
            return region

    print(f"No region found for point: {point}")
    print(f"Bound: {bounds}")
    return None


# Cluster trajectories
clusters = {
    region: {other_region: {"count": 0, "indices": []} for other_region in regions}
    for region in regions
}
for idx, trajectory in enumerate(trajectories):
    start_region = find_region(trajectory["start"])
    end_region = find_region(trajectory["end"])
    if start_region and end_region:
        clusters[start_region][end_region]["count"] += 1
        clusters[start_region][end_region]["indices"].append(idx)

    else:
        print("No matching region found")
        # print(trajectory["start"])
        # print(trajectory["end"])


cluster_counts = {
    region: {
        other_region: clusters[region][other_region]["count"]
        for other_region in regions
    }
    for region in regions
}
clusters_df = pd.DataFrame(cluster_counts).T

print(clusters_df)
# Sum all entries in the dataframe
total = clusters_df.sum().sum()
print(total)

constellations = [
    ("R_111", "R_333"),
    ("R_113", "R_331"),
    ("R_131", "R_313"),
    ("R_133", "R_311"),
]
for start_key, end_key in constellations:
    indices = clusters[start_key][end_key]["indices"]
    print(indices)
    if len(indices) != 0:
        print(f"Start: {start_key}, End: {end_key}")
        print(f"Amount of trajectories: {len(indices)}")
        # frames_indices = frames[indices]
        # # Filter me q_paths where where isin the indices of the cluster
        # q_paths = paths_4123[indices]
        # # start_coords_ = [start_coords[idx] for idx in indices]
        # # end_coords = [end_coords[idx] for idx in indices]
        # # # Visualize the first cluster
        # Size equal to len(indices) unless it is above 20 then 20:
        if len(indices) > 20:
            random_indices = np.random.choice(indices, size=20, replace=False)
        else:
            random_indices = indices
        # # Extract the corresponding paths
        plot_multiple_trajectories(paths_4123[random_indices])
        plot_q_values_per_trajectory(paths_4123[random_indices])