In [34]:
# evaluate time
import time

from os import path

import numpy as np
import torch

from trajectory_prediction.utils import (
    compute_batch_ade_ret,
    compute_batch_fde_ret,
    get_dataloader,
)

# fix random seed
np.random.seed(42)
torch.manual_seed(42)



<torch._C.Generator at 0x74af3adda110>

## Here: put original predict_trajectory function



In [None]:
import numpy as np
import torch


def predict_trajectory(trajectory: np.ndarray, sample=True) -> np.ndarray:
    """Generate 20 diverse possible future trajectories using constant velocity model with stochastic sampling.

    Args:
        trajectory (np.ndarray): Input trajectory of shape [num_agents, traj_length, 2], where traj_length is 8.

    Returns:
        np.ndarray: An array of 20 diverse trajectories of shape [20, num_agents, 12, 2].
    """
    # Convert numpy array to torch tensor for compatibility with reference code
    trajectory_torch = torch.tensor(trajectory, dtype=torch.float32)
    num_agents = trajectory.shape[0]

    # Parameters from the reference code
    num_samples = 20
    sample_angle_std = 25  # Standard deviation for angle sampling in degrees

    all_trajectories = []

    # For each sample
    for _ in range(num_samples):
        # Sample one angle for all agents in this trajectory sample
        if sample:
            sampled_angle = np.random.normal(0, sample_angle_std, 1)[0]
        else:
            sampled_angle = 0

        theta = (sampled_angle * np.pi) / 180.0
        c, s = np.cos(theta), np.sin(theta)
        rotation_mat = torch.tensor([[c, s], [-s, c]], dtype=torch.float32)

        predictions = []

        for agent_idx in range(num_agents):
            # Get observed trajectory for current agent
            observed = trajectory_torch[agent_idx].unsqueeze(0)  # [1, 8, 2]

            # Calculate relative movement (similar to obs_rel in the reference)
            obs_rel = observed[:, 1:] - observed[:, :-1]  # [1, 7, 2]

            # Use the last velocity as constant velocity
            last_velocity = obs_rel[:, -1].unsqueeze(1)  # [1, 1, 2]

            # Apply rotation to the velocity (using the same rotation for all agents)
            vel = last_velocity.squeeze(dim=0).squeeze(dim=0)  # [2]
            rotated_vel = torch.matmul(rotation_mat, vel)  # [2]
            rotated_vel = rotated_vel.unsqueeze(0).unsqueeze(0)  # [1, 1, 2]

            # Create 12 predictions with the constant (rotated) velocity
            pred_rel = rotated_vel.repeat(1, 12, 1)  # [1, 12, 2]

            # Convert to absolute positions (cumulative sum + start position)
            displacement = torch.cumsum(pred_rel, dim=1)  # [1, 12, 2]
            start_pos = observed[:, -1].unsqueeze(1)  # [1, 1, 2]
            pred_abs = displacement + start_pos  # [1, 12, 2]

            predictions.append(pred_abs.squeeze(0).numpy())  # [12, 2]

        # Stack all agent predictions for this sample
        sample_prediction = np.stack(predictions, axis=0)  # [num_agents, 12, 2]
        all_trajectories.append(sample_prediction)

    # Stack all samples
    result = np.stack(all_trajectories, axis=0)  # [20, num_agents, 12, 2]
    return result

## Put new trajectory prediction function here (optimized by AI afterwards)

In [48]:
import numpy as np


def predict_trajectory_optimized(trajectory: np.ndarray, sample=True) -> np.ndarray:
    """Generate 20 diverse possible future trajectories using constant velocity model with stochastic sampling.

    Args:
        trajectory (np.ndarray): Input trajectory of shape [num_agents, traj_length, 2], where traj_length is 8.
        sample (bool): Whether to apply random rotation sampling.

    Returns:
        np.ndarray: An array of 20 diverse trajectories of shape [20, num_agents, 12, 2].
    """
    num_agents = trajectory.shape[0]
    num_samples = 20
    sample_angle_std = 25  # Standard deviation for angle sampling in degrees

    # Calculate relative movement for all agents at once
    obs_rel = trajectory[:, 1:] - trajectory[:, :-1]  # [num_agents, 7, 2]

    # Get last velocity for all agents
    last_velocity = obs_rel[:, -1]  # [num_agents, 2]

    # Starting positions for all agents
    start_pos = trajectory[:, -1]  # [num_agents, 2]

    # Pre-allocate result array
    result = np.zeros((num_samples, num_agents, 12, 2))

    # Generate all samples
    for sample_idx in range(num_samples):
        # Sample one angle for this trajectory sample
        if sample:
            sampled_angle = np.random.normal(0, sample_angle_std, 1)[0]
        else:
            sampled_angle = 0

        theta = (sampled_angle * np.pi) / 180.0
        c, s = np.cos(theta), np.sin(theta)
        rotation_mat = np.array([[c, s], [-s, c]])

        # For each agent
        for agent_idx in range(num_agents):
            vel = last_velocity[agent_idx]  # [2]

            # Apply rotation
            rotated_vel = np.dot(rotation_mat, vel)  # [2]

            # Create predictions with constant rotated velocity
            pred_rel = np.tile(rotated_vel, (12, 1))  # [12, 2]

            # Convert to absolute positions (cumulative sum + start position)
            displacement = np.cumsum(pred_rel, axis=0)  # [12, 2]
            result[sample_idx, agent_idx] = displacement + start_pos[agent_idx]

    return result

## Check if they are the same

In [None]:
NUM_TRAJECTORIES = 20 # expected for minADE20

obs_len = 8
pred_len = 12
batch_size = 32

# datasets = ["eth", "hotel", "univ", "zara1", "zara2"]
datasets = ["eth"]
# datasets = ["hotel"]
# datasets = ["zara1"]

dataset_name = "eth"
print(f"\nProcessing dataset: {dataset_name.upper()}")

dataset_dir = path.join("datasets", dataset_name)

loader_test = get_dataloader(dataset_dir, 'test', 8, 12, batch_size=1)

original_ade = []
original_fde = []
new_ade = []
new_fde = []

with torch.inference_mode():
    for i, batch in enumerate(loader_test):
        input_traj = batch[0].numpy()  # [num_agents, obs_len, 2]
        target_traj = batch[1].numpy()  # [num_agents, pred_len, 2]

        gpt_prediction = predict_trajectory_optimized(input_traj)
        gpt_prediction_original = predict_trajectory(input_traj)
        # assert np.allclose(gpt_prediction, gpt_prediction_original), f"Prediction mismatch at batch {i}"
        # manual way , we can hust check final result too

        original_ade.append(compute_batch_ade_ret(torch.tensor(gpt_prediction_original), torch.tensor(target_traj))[0])
        original_fde.append(compute_batch_fde_ret(torch.tensor(gpt_prediction_original), torch.tensor(target_traj))[0])
        new_ade.append(compute_batch_ade_ret(torch.tensor(gpt_prediction), torch.tensor(target_traj))[0])
        new_fde.append(compute_batch_fde_ret(torch.tensor(gpt_prediction), torch.tensor(target_traj))[0])

print(f"Original ADE: {torch.stack(original_ade).mean().item():.4f}")
print(f"Original FDE: {torch.stack(original_fde).mean().item():.4f}")
print(f"New ADE: {torch.stack(new_ade).mean().item():.4f}")
print(f"New FDE: {torch.stack(new_fde).mean().item():.4f}")



Processing dataset: ETH
Original ADE: 0.9307
Original FDE: 2.0142
New ADE: 0.9210
New FDE: 2.0101


## Main testing loop (original)

In [73]:
NUM_TRAJECTORIES = 20 # expected for minADE20

dataset_name = "eth"
print(f"\nProcessing dataset: {dataset_name.upper()}")

dataset_dir = path.join("datasets", dataset_name)

loader_test = get_dataloader(dataset_dir, 'test', obs_len, pred_len, batch_size=1)


num_tests = 10

total_times = []

with torch.inference_mode():
    for _ in range(num_tests):
        start_time = time.time()
        for i, batch in enumerate(loader_test):
            input_traj = batch[0].numpy()  # [num_agents, obs_len, 2]
            target_traj = batch[1].numpy()  # [num_agents, pred_len, 2]

            # Eval optimized model time
            # gpt_prediction = predict_trajectory_optimized(input_traj)

            # Eval original model time
            gpt_prediction = predict_trajectory(input_traj)


        end_time = time.time()
        total_times.append(end_time - start_time)


# print average time
print(f"Average time: {np.sum(total_times)} seconds")
# time per test
print(f"Time per test: {np.mean(total_times)} seconds")
# time per instnace
print(f"Time per instance: {np.mean(total_times) / len(loader_test)} seconds")

# print the microseconds
print(f"Time per instance: {np.mean(total_times) / len(loader_test) * 1e6} microseconds")



Processing dataset: ETH
Average time: 2.077040910720825 seconds
Time per test: 0.20770409107208251 seconds
Time per instance: 0.00296720130102975 seconds
Time per instance: 2967.20130102975 microseconds


## Test optimized

In [74]:
NUM_TRAJECTORIES = 20 # expected for minADE20

dataset_name = "eth"
print(f"\nProcessing dataset: {dataset_name.upper()}")

dataset_dir = path.join("datasets", dataset_name)

loader_test = get_dataloader(dataset_dir, 'test', obs_len, pred_len, batch_size=1)


num_tests = 10

total_times = []

with torch.inference_mode():
    for _ in range(num_tests):
        start_time = time.time()
        for i, batch in enumerate(loader_test):
            input_traj = batch[0].numpy()  # [num_agents, obs_len, 2]
            target_traj = batch[1].numpy()  # [num_agents, pred_len, 2]

            # Eval optimized model time
            gpt_prediction = predict_trajectory_optimized(input_traj)

            # Eval original model time
            # gpt_prediction = predict_trajectory(input_traj)


        end_time = time.time()
        total_times.append(end_time - start_time)


# print average time
print(f"Average time: {np.sum(total_times)} seconds")
# time per test
print(f"Time per test: {np.mean(total_times)} seconds")
# time per instnace
print(f"Time per instance: {np.mean(total_times) / len(loader_test)} seconds")

# print the microseconds
print(f"Time per instance: {np.mean(total_times) / len(loader_test) * 1e6} microseconds")



Processing dataset: ETH
Average time: 0.6356363296508789 seconds
Time per test: 0.0635636329650879 seconds
Time per instance: 0.0009080518995012556 seconds
Time per instance: 908.0518995012555 microseconds
