In [38]:
import os
import json
from collections import defaultdict

import numpy as np
import torch

from torch.utils import data
from trajdata import AgentBatch, AgentType, UnifiedDataset, SceneBatch
from trajdata.augmentation import NoiseHistories

In [39]:
log_dir = '../../../data/trained_models/trajectory_prediction'
model_dir = os.path.join(log_dir, "eth-28_May_2025_10_28_45")

with open(os.path.join(model_dir, 'config.json'), 'r', encoding="utf-8") as config_json:
    hyperparams = json.load(config_json)
# device
hyperparams["device"] = "cpu"
hyperparams["trajdata_cache_dir"] = "../../../data/pedestrian_datasets/.unified_data_cache"


In [40]:
desired_data=[
    "eupeds_eth-train_loo",
]
data_dirs = {
    "eupeds_eth": "../../../data/pedestrian_datasets/eth_ucy_peds",
}

attention_radius = defaultdict(
    lambda: 20.0
)  # Default range is 20m unless otherwise specified.
attention_radius[(AgentType.PEDESTRIAN, AgentType.PEDESTRIAN)] = 5.0

input_noise = 0.0
augmentations = list()
if input_noise > 0.0:
    augmentations.append(NoiseHistories(stddev=input_noise))

In [None]:
batch_size = 1

dataset = UnifiedDataset(
    desired_data=desired_data,
    centric="scene",
    history_sec=(0.1, hyperparams["history_sec"]),
    future_sec=(0.1, hyperparams["prediction_sec"]),
    agent_interaction_distances=attention_radius,
    incl_robot_future=hyperparams["incl_robot_node"],
    incl_raster_map=hyperparams["map_encoding"],
    only_predict=[AgentType.PEDESTRIAN],
    no_types=[AgentType.UNKNOWN],
    augmentations=augmentations if len(augmentations) > 0 else None,
    num_workers=hyperparams["preprocess_workers"],
    cache_location=hyperparams["trajdata_cache_dir"],
    data_dirs=data_dirs,
    verbose=False,
)

print(f"# Data Samples: {len(dataset):,}")

dataloader = data.DataLoader(
    dataset,
    collate_fn=dataset.get_collate_fn(pad_format="right"),
    pin_memory=False if hyperparams["device"] == "cpu" else True,
    batch_size=batch_size,
    shuffle=True,
    num_workers=hyperparams["preprocess_workers"],
    sampler=None,
)

batch: SceneBatch = next(iter(dataloader))

# Data Samples: 4,356


In [51]:
print(batch.__dict__.keys())

dict_keys(['data_idx', 'scene_ts', 'dt', 'num_agents', 'agent_type', 'centered_agent_state', 'agent_names', 'agent_hist', 'agent_hist_extent', 'agent_hist_len', 'agent_fut', 'agent_fut_extent', 'agent_fut_len', 'robot_fut', 'robot_fut_len', 'map_names', 'maps', 'maps_resolution', 'vector_maps', 'rasters_from_world_tf', 'centered_agent_from_world_tf', 'centered_world_from_agent_tf', 'scene_ids', 'history_pad_dir', 'extras'])


In [42]:
print(f"Scene ID: {batch.scene_ids}")
print(f"Scene Timestep: {batch.scene_ts}")
print(f"Data Idx: {batch.data_idx}")
print(f"Num. of Agents: {batch.num_agents}")
print(f"Agent Names: {batch.agent_names}")

Scene ID: ['crowds_zara02_train']
Scene Timestep: tensor([709], dtype=torch.int32)
Data Idx: tensor([2278], dtype=torch.int32)
Num. of Agents: tensor([10])
Agent Names: [['69', '70', '112', '111', '135', '132', '133', '134', '131', '130']]


In [43]:
print(f"Centered Agent States: {batch.centered_agent_state}")
print(f"Agent History Lengths: {batch.agent_hist_len}")
print(f"Agent Future Lengths: {batch.agent_fut_len}")


Centered Agent States: StateTensorXYXdYdXddYddH([[ 6.0103e+00,  4.9131e+00, -8.9448e-03, -5.3698e-03,
                           -5.5511e-15,  5.5511e-15, -2.6009e+00]],
                         dtype=torch.float64)
Agent History Lengths: tensor([[8, 8, 8, 8, 8, 8, 8, 8, 8, 8]])
Agent Future Lengths: tensor([[12, 12, 12, 12, 12,  7, 12, 12, 12, 12]])


In [44]:
print(f"Agent History Shape: {batch.agent_hist.shape}")
print(f"Agent Future Shape: {batch.agent_fut.shape}")

Agent History Shape: torch.Size([1, 10, 8, 8])
Agent Future Shape: torch.Size([1, 10, 12, 8])


In [45]:
from pathlib import Path

from traj_pred.modules import ModelRegistrar
from traj_pred import TrajectoryPredictor

# Function to load the trajectron model
def load_model(epoch: int):
    while epoch > 0:
        save_path = Path(model_dir) / f'model_registrar-{epoch}.pt'
        if save_path.is_file():
            break
        epoch -= 1

    model_registrar = ModelRegistrar(model_dir, hyperparams["device"])

    trained_model = TrajectoryPredictor(
        model_registrar=model_registrar,
        hyperparams=hyperparams,
        log_writer=False,
        device=hyperparams["device"]
    )
    trained_model.set_environment()

    checkpoint = torch.load(save_path, map_location=hyperparams["device"])
    trained_model.load_state_dict(checkpoint["model_state_dict"], strict=False)

    return trained_model

epoch = 15

traj_predictor: TrajectoryPredictor = load_model(epoch)

In [46]:
# print(traj_predictor)

In [47]:
## Helper Functions
# To Calculate Number of Samples

import warnings

def bisection(N_low, N_high, sample_function):
    """
    :param N_low: Low guess for the intersection
    :param N_high: High guess for the intersection
    :param sample_function: Function for which to find the zero crossing
    :return: The zero crossing
    """
    # print(f"Running bisection to find S between {N_low} and {N_high}...", end="")

    a = N_low
    b = N_high
    f = sample_function

    TOL = 1e-9

    if a > b:
      print('Error: a > b!')
      return -1

    value_a = f(a)

    for i in range(1000):
      c = (a + b) / 2.
      value_c = f(c)

      if value_c == 0 or (b - a) / 2. < TOL:
          # print(f" Found {c:.2f}")
          return c

      if np.sign(value_c) == np.sign(value_a):
          a = c
          value_a = value_c
      else:
          b = c

    print("Bisection failed to converge!")

def nchoosek_rooted(n, k, root):
    """
    To avoid numerical errors, an implementation of N choose K where the root is resolved in between
    :return: (n choose k)^(1/root)
    """
    y = 1.0
    for i in range(k):
        y = y * ((n - (k - i - 1.)) / (k - i))**(1. / root)
    return y

def compute_risk(S, confidence, support):
  with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    return 1. - ((confidence / S) ** (1. / (S - support))) * (1. / nchoosek_rooted(S, support, S - support))

def find_sample_size(support, risk, confidence):
  risk_function = lambda S, support: compute_risk(S, confidence, support)      # Scenario Optimization guarantee for any support
  max_risk_function = lambda S: risk - risk_function(S, support)               # "" for the specified support limit

  S_low = support+1
  S_high = 500000

  S_double = bisection(S_low, S_high, max_risk_function)
  S = np.floor(S_double) + 1

  return S

In [48]:
## Helper functions
# To ensure humans do not collide with the robot during initialization
# and are within field of view of the robot

def is_within_distance(pos1, pos2, distance):
    return np.linalg.norm(np.array(pos1) - np.array(pos2)) <= distance

def angle_between(v1, v2):
    """Calculate the angle between two vectors."""
    v1_u = v1 / np.linalg.norm(v1)
    v2_u = v2 / np.linalg.norm(v2)
    angle = np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
    cross = np.cross(np.append(v1_u, 0), np.append(v2_u, 0))
    if cross[2] < 0:
        angle = -angle
    return angle

def is_within_fov(robot_pos, robot_orientation, target_pos, fov=90):
    """Check if target_pos is within the field of view of the robot."""
    direction_vector = np.array([np.cos(np.deg2rad(robot_orientation)), np.sin(np.deg2rad(robot_orientation))])
    target_vector = np.array(target_pos) - np.array(robot_pos)
    angle = np.rad2deg(angle_between(direction_vector, target_vector))
    return -fov/2 < angle < fov/2

# To generate constraint coefficients given human and robot positions
def get_constraint_coeff(robot_pos, human_pos, r):
    u = human_pos - robot_pos
    A = u / np.linalg.norm(u)
    a = A[0]
    b = A[1]
    c = r - A[0]*human_pos[0] - A[1]*human_pos[1]
    return np.array([a, b, c]) # Linear inequality: ax + by + c <= 0

In [49]:
def check_symmetric(matrix):
    """
    Checks if the given matrix is symmetric (A == A^T).
    Raises a ValueError if it is not symmetric.
    """
    if not np.allclose(matrix, matrix.T):  # Checks if A == A^T (within numerical tolerance)
        raise ValueError("Matrix is not symmetric!")

In [None]:
## Helper Function
## To get predictions for a scene
## and calculate the constraint coefficients

col_radius = 1.0  # Collision radius for agents
buffer_dist = 0.5 # Buufer distance considered when choosing robot position
vision_radius = 12.0  # Vision radius for robots
prediction_horizon = 12

def get_scene_graph_data(scene_batch: SceneBatch, scene_data):
    """
    
    """
    enc = traj_predictor.scene_encoder(scene_batch)
    pred = traj_predictor.incremental_forward(
        batch=scene_batch,
        prediction_horizon=prediction_horizon,
        num_samples=1,
        full_dist=True
    )

    # All humans in the scene
    scene_data["Human Positions"] = obs.curr_agent_state[:, :2].cpu().numpy()

    # Agent to World frame transformations
    agents_to_world_tf = obs.agents_from_world_tf[:, :2, :2].cpu().numpy()
    # Append predicted positions of humans to occupied positions
    occupied_pos = [pos for pos in scene_data["Human Positions"]]
    # Initiate adjaceny matrix
    adjacency_matrix = np.zeros((len(obs.agent_name), len(obs.agent_name)), dtype=int)

    # Loop through all humans
    for idx, human in enumerate(obs.agent_name):
        # Append predicted positions to occupied positions (after transformation to world frame)
        pred_world_frame = pred[human].reshape(prediction_horizon, 2) @ agents_to_world_tf[idx, :, :] + scene_data["Human Positions"][idx, :]
        for jdx in range(prediction_horizon):
            occupied_pos.append(pred_world_frame[jdx, :])
        # Create adjacency matrix
        current_state = scene_data["Human Positions"][idx, :2]
        agent_to_world_tf = agents_to_world_tf[idx, :, :]
        if obs.num_neigh[idx] > 0:
            for neigh_pos in obs.neigh_hist[idx, :obs.num_neigh[idx].item(), -1, :2].cpu().numpy():
                neigh_pos = neigh_pos @ agent_to_world_tf + current_state
                for jdx, agent_pos in enumerate(obs.curr_agent_state[:, :2].cpu().numpy()):
                    if np.linalg.norm(neigh_pos - agent_pos) < 1e-5:
                        adjacency_matrix[idx, jdx] = 1
        # Store human encodings
        scene_data["Encodings"].append(enc[human].cpu().numpy().flatten())

    scene_data["Adjacency Matrix"] = adjacency_matrix
    check_symmetric(scene_data["Adjacency Matrix"])

    # Assign appropriate robot position
    # print("Occupied positions:", occupied_pos)
    while True: # Generate a random position for the robot
        # Ensure the robot's position does not overlap with any occupied positions
        robot_position = (random.uniform(-vision_radius, vision_radius), random.uniform(-vision_radius, vision_radius))
        if not any(is_within_distance(robot_position, pos, col_radius+buffer_dist) for pos in occupied_pos):
            break
    scene_data["Robot Pose"] = [robot_position[0], robot_position[1]]

    return obs, scene_data


def get_constr_data(t, S, obs: AgentBatch, scene_data):

    _, preds, _, _ = trajectron.incremental_forward(
        timestep=t,
        obs=obs,
        maps=None,
        prediction_horizon=prediction_horizon,
        num_samples=int(S),
        full_dist=False
    )

    # Agent to World frame transformations
    agents_to_world_tf = obs.agents_from_world_tf[:, :2, :2].cpu().numpy()

    constraint_coeffs = []
    for idx, human in enumerate(obs.agent_name):
        # print("Pred shape:", preds[human].shape)
        constraint_pos_st = preds[human][:, :, 0, :].reshape(int(S), 2) # TODO: change the timestpe of prediction positions being considered
        constraint_pos = constraint_pos_st @ agents_to_world_tf[idx, :, :] + scene_data["Human Positions"][idx, :]
        # print("Constraint positions:", constraint_pos)
        for sidx in range(int(S)):
            constraint_coeffs.append(get_constraint_coeff(scene_data["Robot Pose"], constraint_pos[sidx, :], col_radius))
            # print("Constraint Coeffs:", constraint_coeffs[-1])
    scene_data["Constraint Coefficients"] = constraint_coeffs
  
    return scene_data

In [None]:
import support_pred.halfplane_module as hm

logger = hm.Logger("logfile.txt", True)
processor = hm.HalfplaneIntersectionProcessor(500, 10)
processor.set_logger(logger)

In [None]:

# constraints = np.array([[3,4,2],[3,8,2],[4,8,2], [4,10,5], [-1,-2,2]])
# support = processor.get_support_size(constraints)
# support

array([[ 3,  4,  2],
       [ 3,  8,  2],
       [ 4,  8,  2],
       [ 4, 10,  5],
       [-1, -2,  2]])

3