In [1]:
import os
import json
from functools import partial
from collections import defaultdict
from typing import Tuple, List, Optional
import numpy as np
import torch

from torch.utils import data
from trajdata import UnifiedDataset, SceneBatch, AgentBatch
from trajdata.data_structures.batch_element import SceneBatchElement
from trajdata.data_structures.agent import AgentMetadata, AgentType
from trajdata.data_structures.state import StateArray
from trajdata.augmentation import NoiseHistories

In [2]:
def all_current_states(
    batch_elem: SceneBatchElement,
    max_agent_num: int
) -> np.ndarray:
    agents: List[AgentMetadata] = batch_elem.agents
    curr_pos = []
    for agent in agents:
        raw_state: StateArray = batch_elem.cache.get_raw_state(
            agent.name, batch_elem.scene_ts
        )
        state = np.asarray(raw_state)
        curr_pos.append(state)
    stacked_pos = np.stack(curr_pos, axis=0)
    num_agents = stacked_pos.shape[0]
    pad_shape = (max_agent_num - num_agents, *stacked_pos.shape[1:])
    padding = np.zeros(pad_shape, dtype=stacked_pos.dtype)
    return np.concatenate([stacked_pos, padding], axis=0)

def get_neighs(
    batch_elem: SceneBatchElement,
    interaction_radius: float,
    max_agent_num: int,
):
    curr_states = batch_elem.extras["curr_states"]
    num_agents = len(curr_states)
    is_neigh = []
    for state in curr_states:
        distances = [
            np.linalg.norm(state[:2] - agent_st[:2])
            for agent_st in curr_states
        ]
        is_neigh.append([dist <= interaction_radius for dist in distances])
    is_neigh_arr = np.stack(is_neigh, axis=0)
    padded = np.zeros((max_agent_num, max_agent_num), dtype=bool)
    padded[:num_agents, :num_agents] = is_neigh_arr
    return padded

In [3]:
log_dir = '../../../data/trained_models/trajectory_prediction'
model_dir = os.path.join(log_dir, "eth-28_May_2025_10_28_45")

with open(os.path.join(model_dir, 'config.json'), 'r', encoding="utf-8") as config_json:
    hyperparams = json.load(config_json)
# device
hyperparams["device"] = "cpu"
hyperparams["trajdata_cache_dir"] = "../../../data/pedestrian_datasets/.unified_data_cache"

desired_data=[
    "eupeds_eth-train",
]
max_agent_num=20
data_dirs = {
    "eupeds_eth": "../../../data/pedestrian_datasets/eth_ucy_peds",
}

attention_radius = defaultdict(
    lambda: 20.0
)  # Default range is 20m unless otherwise specified.
# attention_radius[(AgentType.PEDESTRIAN, AgentType.PEDESTRIAN)] = 5.0

input_noise = 0.0
augmentations = list()
if input_noise > 0.0:
    augmentations.append(NoiseHistories(stddev=input_noise))

batch_size = 4

dataset = UnifiedDataset(
    desired_data=desired_data,
    centric="scene",
    # centric="agent",
    max_agent_num=max_agent_num,
    history_sec=(0.1, hyperparams["history_sec"]),
    future_sec=(0.1, hyperparams["prediction_sec"]),
    agent_interaction_distances=attention_radius,
    incl_robot_future=hyperparams["incl_robot_node"],
    incl_raster_map=hyperparams["map_encoding"],
    only_predict=[AgentType.PEDESTRIAN],
    no_types=[AgentType.UNKNOWN],
    augmentations=augmentations if len(augmentations) > 0 else None,
    standardize_data=False,
    num_workers=hyperparams["preprocess_workers"],
    cache_location=hyperparams["trajdata_cache_dir"],
    data_dirs=data_dirs,
    verbose=True,
    extras={
            "curr_states": partial(all_current_states, max_agent_num=max_agent_num)
            # "is_neigh": partial(get_neighs, interaction_radius=5.0),
        }
)

print(f"# Data Samples: {len(dataset):,}")

dataloader = data.DataLoader(
    dataset,
    collate_fn=dataset.get_collate_fn(pad_format="right"),
    pin_memory=False if hyperparams["device"] == "cpu" else True,
    batch_size=batch_size,
    shuffle=True,
    num_workers=hyperparams["preprocess_workers"],
    sampler=None,
)

Loading data for matched scene tags: ['eupeds_eth-train-zurich', 'cyprus-eupeds_eth-train']


Calculating Agent Data (Serially): 100%|██████████| 1/1 [00:00<00:00, 13797.05it/s]

1 scenes in the scene index.



Creating Scene Data Index (16 CPUs): 100%|██████████| 1/1 [00:00<00:00, 143.86it/s]
Structuring Scene Data Index: 100%|██████████| 1/1 [00:00<00:00, 13066.37it/s]

# Data Samples: 674





In [4]:
batch: SceneBatch = next(iter(dataloader))

In [5]:
batch.__dict__.keys()

dict_keys(['data_idx', 'scene_ts', 'dt', 'num_agents', 'agent_type', 'centered_agent_state', 'agent_names', 'agent_hist', 'agent_hist_extent', 'agent_hist_len', 'agent_fut', 'agent_fut_extent', 'agent_fut_len', 'robot_fut', 'robot_fut_len', 'map_names', 'maps', 'maps_resolution', 'vector_maps', 'rasters_from_world_tf', 'centered_agent_from_world_tf', 'centered_world_from_agent_tf', 'scene_ids', 'history_pad_dir', 'extras'])

In [6]:
print(f"Num Agents: {batch.num_agents}")
print(f"Agent History shape: {batch.agent_hist.shape}")


Num Agents: tensor([2, 4, 5, 9])
Agent History shape: torch.Size([4, 9, 8, 8])


In [9]:
batch.agent_hist_len

tensor([[8, 1, 0, 0, 0, 0, 0, 0, 0],
        [8, 8, 8, 8, 0, 0, 0, 0, 0],
        [8, 3, 6, 8, 8, 0, 0, 0, 0],
        [8, 8, 8, 7, 7, 7, 7, 8, 8]])

In [17]:
batch.agent_hist[1,1,0,:]

StateTensorXYXdYdXddYddSC([ 0.4400,  8.7800,  2.9250, -1.2750,  0.1250,  0.0625,
                           -0.3996,  0.9167])

In [None]:
centered_agent_state = batch.agent_hist[1,0,0].cpu().detach().numpy()
agent_pos = centered_agent_state[:2]
agent_sc = centered_agent_state[-2:]
print(f"Centered Agent Pos: {agent_pos}")
print(f"Centered Agent Sin Cos: {agent_sc}")

cos_agent = agent_sc[-1]
sin_agent = agent_sc[-2]
centered_world_from_agent_tf: np.ndarray = np.array(
    [
        [cos_agent, -sin_agent, agent_pos[0]],
        [sin_agent, cos_agent, agent_pos[1]],
        [0.0, 0.0, 1.0],
    ]
)
centered_agent_from_world_tf: np.ndarray = np.linalg.inv(
    centered_world_from_agent_tf
)



Centered Agent Pos: [3.83 2.78]
Centered Agent Sin Cos: [0.3729585  0.92784804]


In [6]:
## Helper Functions
# To Calculate Number of Samples

import warnings

def bisection(N_low, N_high, sample_function):
    """
    :param N_low: Low guess for the intersection
    :param N_high: High guess for the intersection
    :param sample_function: Function for which to find the zero crossing
    :return: The zero crossing
    """
    # print(f"Running bisection to find S between {N_low} and {N_high}...", end="")

    a = N_low
    b = N_high
    f = sample_function

    TOL = 1e-9

    if a > b:
      print('Error: a > b!')
      return -1

    value_a = f(a)

    for i in range(1000):
      c = (a + b) / 2.
      value_c = f(c)

      if value_c == 0 or (b - a) / 2. < TOL:
          # print(f" Found {c:.2f}")
          return c

      if np.sign(value_c) == np.sign(value_a):
          a = c
          value_a = value_c
      else:
          b = c

    print("Bisection failed to converge!")

def nchoosek_rooted(n, k, root):
    """
    To avoid numerical errors, an implementation of N choose K where the root is resolved in between
    :return: (n choose k)^(1/root)
    """
    y = 1.0
    for i in range(k):
        y = y * ((n - (k - i - 1.)) / (k - i))**(1. / root)
    return y

def compute_risk(S, confidence, support):
  with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    return 1. - ((confidence / S) ** (1. / (S - support))) * (1. / nchoosek_rooted(S, support, S - support))

def find_sample_size(support, risk, confidence):
  risk_function = lambda S, support: compute_risk(S, confidence, support)      # Scenario Optimization guarantee for any support
  max_risk_function = lambda S: risk - risk_function(S, support)               # "" for the specified support limit

  S_low = support+1
  S_high = 500000

  S_double = bisection(S_low, S_high, max_risk_function)
  S = np.floor(S_double) + 1

  return S

In [7]:
## Helper functions
# To ensure humans do not collide with the robot during initialization
# and are within field of view of the robot

def is_within_distance(pos1, pos2, distance):
    return np.linalg.norm(np.array(pos1) - np.array(pos2)) <= distance

def angle_between(v1, v2):
    """Calculate the angle between two vectors."""
    v1_u = v1 / np.linalg.norm(v1)
    v2_u = v2 / np.linalg.norm(v2)
    angle = np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
    cross = np.cross(np.append(v1_u, 0), np.append(v2_u, 0))
    if cross[2] < 0:
        angle = -angle
    return angle

def is_within_fov(robot_pos, robot_orientation, target_pos, fov=90):
    """Check if target_pos is within the field of view of the robot."""
    direction_vector = np.array([np.cos(np.deg2rad(robot_orientation)), np.sin(np.deg2rad(robot_orientation))])
    target_vector = np.array(target_pos) - np.array(robot_pos)
    angle = np.rad2deg(angle_between(direction_vector, target_vector))
    return -fov/2 < angle < fov/2

# To generate constraint coefficients given human and robot positions
def get_constraint_coeff(robot_pos, human_pos, r):
    u = human_pos - robot_pos
    A = u / np.linalg.norm(u)
    a = A[0]
    b = A[1]
    c = r - A[0]*human_pos[0] - A[1]*human_pos[1]
    return np.array([a, b, c]) # Linear inequality: ax + by + c <= 0

In [8]:
def check_symmetric(matrix):
    """
    Checks if the given matrix is symmetric (A == A^T).
    Raises a ValueError if it is not symmetric.
    """
    if not np.allclose(matrix, matrix.T):  # Checks if A == A^T (within numerical tolerance)
        raise ValueError("Matrix is not symmetric!")

In [10]:
from pathlib import Path

from traj_pred.modules import ModelRegistrar
from traj_pred import TrajectoryPredictor

# Function to load the trajectron model
def load_model(epoch: int):
    while epoch > 0:
        save_path = Path(model_dir) / f'model_registrar-{epoch}.pt'
        if save_path.is_file():
            break
        epoch -= 1

    model_registrar = ModelRegistrar(model_dir, hyperparams["device"])

    model = TrajectoryPredictor(
        model_registrar=model_registrar,
        hyperparams=hyperparams,
        log_writer=None,
        device=hyperparams["device"])
    model.set_environment()

    checkpoint = torch.load(save_path, map_location=hyperparams["device"])
    model.load_state_dict(checkpoint["model_state_dict"], strict=False)

    return model

epoch = 15

traj_predictor = load_model(epoch)

In [None]:
## Helper Function
## To get predictions for a scene
## and calculate the constraint coefficients

col_radius = 1.0  # Collision radius for agents
buffer_dist = 0.5 # Buufer distance considered when choosing robot position
vision_radius = 12.0  # Vision radius for robots
ph = 12

def get_scene_graph_data(batch: SceneBatch, scene_data):
    enc = traj_predictor.get_encoding(batch) # TODO
    preds = traj_predictor.incremental_forward(
        batch=batch,
        prediction_horizon=ph,
        num_samples=1,
        full_dist=True
    )

    # All humans in the scene
    scene_data["Human Positions"] = obs.curr_agent_state[:, :2].cpu().numpy()

    # Agent to World frame transformations
    agents_to_world_tf = obs.agents_from_world_tf[:, :2, :2].cpu().numpy()
    # Append predicted positions of humans to occupied positions
    occupied_pos = [pos for pos in scene_data["Human Positions"]]
    # Initiate adjaceny matrix
    adjacency_matrix = np.zeros((len(obs.agent_name), len(obs.agent_name)), dtype=int)

    # Loop through all humans
    for idx, human in enumerate(obs.agent_name):
        # Append predicted positions to occupied positions (after transformation to world frame)
        pred_world_frame = pred[human].reshape(prediction_horizon, 2) @ agents_to_world_tf[idx, :, :] + scene_data["Human Positions"][idx, :]
        for jdx in range(prediction_horizon):
            occupied_pos.append(pred_world_frame[jdx, :])
        # Create adjacency matrix
        current_state = scene_data["Human Positions"][idx, :2]
        agent_to_world_tf = agents_to_world_tf[idx, :, :]
        if obs.num_neigh[idx] > 0:
            for neigh_pos in obs.neigh_hist[idx, :obs.num_neigh[idx].item(), -1, :2].cpu().numpy():
                neigh_pos = neigh_pos @ agent_to_world_tf + current_state
                for jdx, agent_pos in enumerate(obs.curr_agent_state[:, :2].cpu().numpy()):
                    if np.linalg.norm(neigh_pos - agent_pos) < 1e-5:
                        adjacency_matrix[idx, jdx] = 1
        # Store human encodings
        scene_data["Encodings"].append(enc[human].cpu().numpy().flatten())

    scene_data["Adjacency Matrix"] = adjacency_matrix
    check_symmetric(scene_data["Adjacency Matrix"])

    # Assign appropriate robot position
    # print("Occupied positions:", occupied_pos)
    while True: # Generate a random position for the robot
        # Ensure the robot's position does not overlap with any occupied positions
        robot_position = (random.uniform(-vision_radius, vision_radius), random.uniform(-vision_radius, vision_radius))
        if not any(is_within_distance(robot_position, pos, col_radius+buffer_dist) for pos in occupied_pos):
            break
    scene_data["Robot Pose"] = [robot_position[0], robot_position[1]]

    return obs, scene_data


def get_constr_data(t, S, obs: AgentBatch, scene_data):

    _, preds, _, _ = trajectron.incremental_forward(
        timestep=t,
        obs=obs,
        maps=None,
        prediction_horizon=prediction_horizon,
        num_samples=int(S),
        full_dist=False
    )

    # Agent to World frame transformations
    agents_to_world_tf = obs.agents_from_world_tf[:, :2, :2].cpu().numpy()

    constraint_coeffs = []
    for idx, human in enumerate(obs.agent_name):
        # print("Pred shape:", preds[human].shape)
        constraint_pos_st = preds[human][:, :, 0, :].reshape(int(S), 2) # TODO: change the timestpe of prediction positions being considered
        constraint_pos = constraint_pos_st @ agents_to_world_tf[idx, :, :] + scene_data["Human Positions"][idx, :]
        # print("Constraint positions:", constraint_pos)
        for sidx in range(int(S)):
            constraint_coeffs.append(get_constraint_coeff(scene_data["Robot Pose"], constraint_pos[sidx, :], col_radius))
            # print("Constraint Coeffs:", constraint_coeffs[-1])
    scene_data["Constraint Coefficients"] = constraint_coeffs
  
    return scene_data

In [None]:
# Test constraint calculation
scene_data: Dict[str, List] = {
                "Robot Pose": None,
                "Human Positions": [],
                "Adjacency Matrix": None,
                "Encodings": [],
                }
test_t=287
obs, scene_data = get_scene_graph_data(test_t, scene_data)

In [None]:
scene_data["Constraint Coefficients"] = None
scene_data = get_constr_data(test_t, 10, obs, scene_data)