In [1]:
import os
os.chdir("../")

import numpy as np

In [2]:
# build the environment to operate on
import gymnasium as gym
import env.register_env
from env.adversaries import Chaser, RandomMover, TrajectoryFollower

window, dt, max_speed, max_accel = 5.0, 0.1, 1, 0.5 # for obstacles
n_adversaries = 3

adversaries = [
    RandomMover(i, window, dt, max_accel, max_speed)
    for i in range(n_adversaries)
]

max_speed_agent = 2
max_accel_agent = 10
env = gym.make(
    "ReachAvoid-v0",
    adversaries=adversaries,
    window=window,
    dt=dt,
    max_speed=max_speed_agent,
    max_accel=max_accel_agent,
)

In [3]:
# initialize conformal prediction pipeline
import pickle
from conformal_region import  ConformalRegion

alpha = 0.3  # failure probability
horizon = 17
traj_path = "dataset/random_5_p1_1_p5.npy"
pred_path = "dataset/random_5_p1_1_p5_pred_20.npy"
predictor_path = "dataset/random_5_p1_1_p5_ridge_model.pkl"

# load the model
with open(predictor_path, "rb") as f:
    predictor = pickle.load(f)

CP = ConformalRegion(traj_path, pred_path, 20, 1-np.power(1-alpha, 1/n_adversaries))
print("alpha level for each agent is", 1-np.power(1-alpha, 1/n_adversaries))

alpha level for each agent is 0.11209599825739935


In [4]:
# load the MPC pipeline
from mpc import MPCSolver

# initialize dynamic as double integrator
A = np.array(
    [
        [1, 0, dt, 0],
        [0, 1, 0, dt],
        [0, 0, 1, 0],
        [0, 0, 0, 1],
    ]
)
B = np.array([
    [0, 0], 
    [0, 0], 
    [dt, 0], 
    [0, dt]]
)

Q = np.eye(4) * 0.01
R = np.eye(2) * 0.01

mpc = MPCSolver(A, B, Q, R, horizon, window, max_speed_agent, max_accel_agent, dt)

In [5]:
len_memory = 5
initial_obs = env.reset(options={
    "adv_pos": np.array([[2, 4], [4,1], [3,3]])
})[0]

# queue for keeping track of obstacles' history position, used to predict
obs_history = [list(x0[:2])*len_memory for x0 in initial_obs["adversaries"]]

def rollout_trajectories(init_history):
    """
    Autoregressively predict obstacle trajectories.
    Input: (n_adversary, len_memory, 2) obstacle position history
    Output: (n_adversary, horizon+1, 2) obstacle furture state prediction
    """
    m = init_history.shape[0]
    # container: time 0…horizon
    preds = np.zeros((m, horizon+len_memory+1, 2), dtype=float)
    # fill in the known history
    preds[:, :len_memory, :] = init_history

    for t in range(len_memory, horizon+len_memory+1):
        # grab the last 5 steps as a flat 10-dim feature
        X_batch = preds[:, t-len_memory:t, :].reshape(m, len_memory*2)
        # predict next (x,y) for all m obstacles at once
        next_xy = predictor.predict(X_batch)   # → shape (m,2)
        preds[:, t, :] = next_xy

    return preds[:, len_memory:,]

# sanity check
rollout_trajectories(np.array(obs_history).reshape(n_adversaries, len_memory, 2)).shape

  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")


(3, 18, 2)

In [6]:
# policy for the MPC agent
def policy(t, observation, log=None) -> np.ndarray:
    x_goal = observation["landmark"]
    x0 = observation["agent"]
    
    # update obstacle memory
    for i, adv in enumerate(obs_history):
        adv.pop(0)
        adv.pop(0)
        adv += list(observation["adversaries"][i][:2])
    
    obs_preds = rollout_trajectories(np.array(obs_history).reshape(n_adversaries, len_memory, 2))
    obs_radii = np.array([CP.construct_valid_prediction_regions(t, i) for i in range(horizon+1)])
    obs_radii = np.repeat(obs_radii[None, :], n_adversaries, axis=0) + 0.4
    
    if log:
        log["obs_preds"].append(obs_preds)
        log["obs_radii"].append(obs_radii)

    x, u, prob = mpc.solve(x0, x_goal, obs_preds, obs_radii)
    if u is None:
        # Damn we did not get a solution
        # let's do nothing and hope for the best...
        return np.zeros(2)
        
    return u[:, 0] # return only the first sequence 

In [7]:
# main simulation loop
import copy

action = env.action_space.sample()  # action at 0
simulation_trajectory = []
log = {
    "obs_preds": [],
    "obs_radii": []
}

for t in range(100):
    obs, reward, done, _, _ = env.step(action)
    simulation_trajectory.append(copy.deepcopy(obs))

    if done:
        print("receive reward", reward)
        break
    
    action = policy(t, obs, log)

  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")


receive reward 1


In [8]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

def animate_trajectory(simulation_trajectory, xlim=(0, 5), ylim=(0, 5), interval=200):
    n_steps = len(simulation_trajectory)
    agent_pos = np.array([step['agent'][:2] for step in simulation_trajectory])
    landmark = simulation_trajectory[0]['landmark'][:2]
    m = len(simulation_trajectory[0]['adversaries'])
    adv_pos = np.stack([[step['adversaries'][i][:2]
                         for step in simulation_trajectory]
                        for i in range(m)], axis=1)

    fig, ax = plt.subplots()
    ax.set_xlim(*xlim)
    ax.set_ylim(*ylim)
    ax.set_aspect('equal')
    agent_scatter = ax.scatter([], [], c='blue', s=50, label='Agent')
    adv_scatter   = ax.scatter([], [], c='red',  s=50, label='Adversaries')
    landmark_scatter = ax.scatter(*landmark, c='green',
                                  marker='*', s=100, label='Landmark')
    ax.legend(loc='upper right')

    def init():
        # <-- use (0,2) empty arrays, not []
        agent_scatter.set_offsets(np.empty((0, 2)))
        adv_scatter.set_offsets(np.empty((0, 2)))
        return agent_scatter, adv_scatter, landmark_scatter

    def update(frame):
        agent_scatter.set_offsets(agent_pos[frame])
        adv_scatter.set_offsets(adv_pos[frame])
        return agent_scatter, adv_scatter, landmark_scatter

    anim = FuncAnimation(fig, update, frames=n_steps,
                         init_func=init, blit=True, interval=interval)
    plt.close(fig)
    return HTML(anim.to_jshtml())

In [9]:
animate_trajectory(simulation_trajectory)

In [10]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Circle
from IPython.display import HTML

def animate_with_predictions(
    simulation_trajectory,
    obs_preds_log,
    obs_radii_log,
    xlim=(0, 5),
    ylim=(0, 5),
    interval=200,
):
    """
    Animate actual trajectories along with predicted obstacle circles and connecting lines over the full prediction horizon.

    - simulation_trajectory: list of dict with keys 'agent','adversaries','landmark'
    - obs_preds_log: np.ndarray, shape (T_log, m, H, 2)
    - obs_radii_log: np.ndarray, shape (T_log, m, H)
    """
    # determine common frame count
    T_sim = len(simulation_trajectory)
    T_pred = obs_preds_log.shape[0]
    T_rad = obs_radii_log.shape[0]
    T = min(T_sim, T_pred, T_rad)
    if T < max(T_sim, T_pred, T_rad):
        print(f"Warning: clipping animation to {T} frames "
              f"(sim {T_sim}, preds {T_pred}, radii {T_rad})")

    m, H = obs_preds_log.shape[1], obs_preds_log.shape[2]
    agent_pos = np.array([step['agent'][:2]
                          for step in simulation_trajectory[:T]])
    landmark = simulation_trajectory[0]['landmark'][:2]

    fig, ax = plt.subplots()
    ax.set_xlim(*xlim)
    ax.set_ylim(*ylim)
    ax.set_aspect('equal')

    # scatter for actual entities
    agent_scatter = ax.scatter([], [], c='blue', s=50, label='Agent')
    adv_scatter  = ax.scatter([], [], c='red',  s=50, label='Adversaries')
    landmark_scatter = ax.scatter(*landmark, c='green',
                                  marker='*', s=100, label='Landmark')

    # create m*H circle patches for predicted obstacles
    circles = []
    for _ in range(m * H):
        circ = Circle((0, 0), radius=0, fill=False,
                      edgecolor='orange', linewidth=1, linestyle='--',
                      alpha=0.6)
        ax.add_patch(circ)
        circles.append(circ)

    # create one line artist per obstacle to connect predicted centers
    lines = []
    for i in range(m):
        line, = ax.plot([], [], '-', linewidth=1.5, alpha=0.7,
                        label='Prediction Path' if i == 0 else "")
        lines.append(line)

    ax.legend(loc='upper right')

    def init():
        agent_scatter.set_offsets(np.empty((0, 2)))
        adv_scatter.set_offsets(np.empty((0, 2)))
        for c in circles:
            c.set_center((0, 0))
            c.set_radius(0)
        for line in lines:
            line.set_data([], [])
        return [agent_scatter, adv_scatter, landmark_scatter] + circles + lines

    def update(frame):
        # actual positions
        agent_scatter.set_offsets(agent_pos[frame])
        adv_actual = np.vstack(
            [adv[:2] for adv in simulation_trajectory[frame]['adversaries']])
        adv_scatter.set_offsets(adv_actual)

        # predicted circles and lines
        preds = obs_preds_log[frame]   # (m, H, 2)
        rats  = obs_radii_log[frame]   # (m, H)
        idx = 0
        for i in range(m):
            # collect centers for this obstacle
            path = preds[i]  # (H,2)
            xs, ys = path[:, 0], path[:, 1]
            lines[i].set_data(xs, ys)
            for h in range(H):
                x_c, y_c = preds[i, h]
                r = rats[i, h]
                circles[idx].set_center((float(x_c), float(y_c)))
                circles[idx].set_radius(float(r))
                idx += 1

        return [agent_scatter, adv_scatter, landmark_scatter] + circles + lines

    anim = FuncAnimation(fig, update, frames=T,
                         init_func=init, blit=True, interval=interval)
    plt.close(fig)
    return HTML(anim.to_jshtml())

# Example usage in Jupyter:
# anim_html = animate_with_predictions(
#     simulation_trajectory,
#     np.array(log["obs_preds"]),
#     np.array(log["obs_radii"])
# )
# from IPython.display import display
# display(anim_html)


In [11]:
anim_html = animate_with_predictions(
    simulation_trajectory,
    np.array(log["obs_preds"]),
    np.array(log["obs_radii"]),
)
from IPython.display import display
display(anim_html)



In [12]:
np.array(log["obs_radii"]).shape

(80, 3, 18)