In [6]:
import os, sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from matplotlib.colors import to_rgb
from scipy.stats import multivariate_normal
import seaborn as sns
import subprocess
import glob
import pdb
from multiprocessing import Pool
from tqdm import tqdm
from utils.preprocessing import load_SDD
from utils.plotting_utils import add_robot
from utils.metrics import get_distances
from plan_trajectory import setup_params
import yaml

  from .autonotebook import tqdm as notebook_tqdm


In [7]:
# Setup
scene = 'nexus_4'
forecaster = 'darts'
json_name = './params/' + scene + '.json'
params = yaml.safe_load(open(json_name))
mode = 'train' if scene in os.listdir('./data') else 'test'
df = pd.read_csv('./data/' + forecaster + '/' + scene + '.csv')
cmap = plt.get_cmap('terrain')
frames, _, _ = load_SDD(scene, mode=mode, load_frames=[500])
frame = frames[0]
H, W, _  = frame.shape
crop = [[int(0.45*H), int(0.65*H)-1], [int(0*W), int(1*W)-1]]
params = setup_params(params, H, W, None, None)
methods = ['ACI', 'Conformal Controller', 'Aggressive', 'Conservative']
colors = {"ACI": '#6C1587', "Conformal Controller": '#FA8128', "Aggressive": "#BC0000", "Conservative": "#1F456E"}
sequences = {}
distances = {}
for method in methods:
    if method == 'ACI':
        lr = 0.1
        str_append = forecaster + "_aci_" + str(lr).replace('.', '_')
    elif method == 'Conformal Controller':
        lr = 500.0
        str_append = forecaster + "_conformal_controller_" + str(lr).replace('.', '_')
    elif method == 'Conservative':
        lr = 0.0
        str_append = forecaster + "_conservative_" + str(lr).replace('.', '_')
    elif method == 'Aggressive':
        lr = 0.0
        str_append = forecaster + "_conformal_controller_" + str(lr).replace('.', '_')
    plan_folder = './plans/' + scene + '/'
    plan_filename = os.path.join(plan_folder, 'plan_df_' + str_append + '.csv')
    plan_df = pd.read_csv(plan_filename)
    distances[method] = get_distances(plan_df, px_per_m = params['px_per_m'])
    plan_df = plan_df[(plan_df.metaId == -1) & (plan_df.r_goal_reached == False) & (plan_df.aheads == 1)]
    if method == 'ACI':
        sequences[method] = np.stack([plan_df.x.to_numpy(), plan_df.y.to_numpy(), plan_df['alphat'].to_numpy()], axis=0)
    elif method == 'Conformal Controller':
        sequences[method] = np.stack([plan_df.x.to_numpy(), plan_df.y.to_numpy(), plan_df['lambda'].to_numpy()], axis=0)
    elif method == 'Conservative':
        sequences[method] = np.stack([plan_df.x.to_numpy(), plan_df.y.to_numpy()], axis=0)
    elif method == 'Aggressive':
        sequences[method] = np.stack([plan_df.x.to_numpy(), plan_df.y.to_numpy()], axis=0)

OpenCV: Couldn't read video stream from file "/checkpoints/aa/SDD/videos/nexus/video4/video.mov"
[ERROR:0@157.801] global /Users/xperience/actions-runner/_work/opencv-python/opencv-python/opencv/modules/videoio/src/cap.cpp (166) open VIDEOIO(CV_IMAGES): raised OpenCV exception:

OpenCV(4.6.0) /Users/xperience/actions-runner/_work/opencv-python/opencv-python/opencv/modules/videoio/src/cap_images.cpp:253: error: (-5:Bad argument) CAP_IMAGES: can't find starting number (in the name of file): /checkpoints/aa/SDD/videos/nexus/video4/video.mov in function 'icvExtractPattern'




Parsing scene: nexus_4


100%|██████████████████████████████████████████| 1/1 [00:00<00:00, 26379.27it/s]

Processing annotations




In [None]:
def calculate_dwell_times(x, y, tolerance=20):
    dwell_times = np.ones_like(x)
    
    i = 0
    while i < len(x) - 1:
        j = i + 1
        while j < len(x) and np.linalg.norm([x[j] - x[i], y[j] - y[i]]) < tolerance:
            dwell_times[i] += 1
            j += 1
        
        i = j
    
    return dwell_times

dwell_times = {
    k : calculate_dwell_times(sequences[k][1], sequences[k][0]) for k in methods
}

frame_cropped = frame[crop[0][0]:crop[0][1], crop[1][0]:crop[1][1]]

collision_indexes = {
    k : np.where(distances[k] < params['robot_size'] + params['human_size'])[0][0] if np.sum(distances[k] < params['robot_size'] + params['human_size']) > 0 else None for k in methods
}

x = {
    k : sequences[k][1][:np.nan_to_num(collision_indexes[k], -1)] - crop[1][0] for k in methods
}

y = {
    k : sequences[k][0][:np.nan_to_num(collision_indexes[k], -1)] - crop[0][0] for k in methods
}

lambdas = {
    k : sequences[k][2][:np.nan_to_num(collision_indexes[k], -1)] for k in ['ACI', 'Conformal Controller']
}

lengths = {
    k : sequences[k][0].shape[0] for k in methods
}

max_length = max(lengths.values())

darkening_factor = {
    k : lengths[k]/max_length for k in methods
}

zorders = {
    "ACI": 0,
    "Conformal Controller": 2,
    "Aggressive": 2,
    "Conservative": 0
}

In [None]:
def plot_line_gradient(ax, x, y, base_color, darkening_factor, label=None, zorder=0, linewidth=1, collided=None):
    """
    Plots a line with a gradient from light to dark based on a base color and darkening factor.

    Parameters:
    x (list or array): The x coordinates of the line.
    y (list or array): The y coordinates of the line.
    base_color (tuple): The RGB values of the darkest color the line will take on, in the range [0, 1].
    darkening_factor (float): Controls the speed of color interpolation from light to dark.
    ax (matplotlib.axes._axes.Axes): The axes on which to plot the gradient line.
    """
    n = len(x)
    base_color = to_rgb(base_color)
    for i in range(n - 1):
        t = i / (n - 1) * darkening_factor
        color = tuple(max(0, min(c + (1-t)*(1-c), 1)) for c in base_color)
        if i < n-2:
            ax.plot(x[i:i+2], y[i:i+2], color=color, zorder=zorder, linewidth=linewidth, label=None)
        else:
            ax.plot(x[i:i+2], y[i:i+2], color=color, zorder=zorder, linewidth=linewidth, label=label)
            if collided:
                ax.scatter(x[-1], y[-1], color=color, marker="x", zorder=zorder)

In [5]:
linewidth=2
methods_to_plot=["Conformal Controller", "Aggressive", "Conservative"]
savename = './teaser/teaser'
for method in methods_to_plot:
    savename += "_" + method
savename += ".pdf"
markevery = params["fps"]

fig = plt.figure()
fig.set_size_inches(1. * frame_cropped.shape[1] / frame_cropped.shape[0], 1, forward = False)
plt.imshow(frame_cropped)
for method in methods:
    if method not in methods_to_plot:
        continue
    collided = collision_indexes[method] is not None
    plot_line_gradient(plt.gca(), x[method], y[method], colors[method], darkening_factor[method], label=method, zorder=zorders[method], linewidth=linewidth, collided=collided)
add_robot(plt.gca(), params['r_start'][1]-crop[0][0], params['r_start'][0]-crop[1][0]+30, 7.5, 7.5, False)
# (Add the goal_rad to the x axis because otherwise it looks weird.)
plt.scatter(params['r_goal'][0][0] - crop[1][0] + params['goal_rad'], params['r_goal'][0][1] - crop[0][0], marker='*', color='gold', zorder=2)
plt.axis('off')
if len(methods_to_plot) > 1:
    leg = plt.legend()
    handles, labels = plt.gca().get_legend_handles_labels()
    order = [methods_to_plot.index(label) for label in labels]
    leg = plt.legend([handles[idx] for idx in order],[labels[idx] for idx in order], fontsize=3, borderpad=0.6)
    leg.get_frame().set_linewidth(0.25)

os.makedirs('./teaser/', exist_ok=True)
plt.savefig(savename, dpi=frame.shape[0], bbox_inches='tight', pad_inches=0)
plt.gcf().set_dpi(frame.shape[0])
plt.show()

NameError: name 'frame_cropped' is not defined

<Figure size 432x288 with 0 Axes>

In [None]:
fig, axs = plt.subplots(nrows=2, ncols=1)
fig.set_size_inches(frame_cropped.shape[1] / frame_cropped.shape[0] * 1.5, 2.3, forward = False)
t_aci = np.arange(lambdas["ACI"].shape[0])/params["fps"]
t_conformal = np.arange(lambdas["Conformal Controller"].shape[0])/params["fps"]
axs[0].plot([], [], color=colors["Conformal Controller"], label="CC")
axs[0].plot(t_aci, lambdas["ACI"], color=colors["ACI"], label="ACI")
ax2 = axs[0].twinx()
def thousand_formatter(x, pos):
    if x >= 500:
        return f'{x/1000:.0f}K'
    else:
        return f'{x:.0f}'
ax2.plot(t_conformal, lambdas["Conformal Controller"], color=colors["Conformal Controller"])
fig.set_dpi(frame.shape[0])
sns.despine(top=True, right=False)
axs[0].set_ylabel(r'$\alpha_t$')
axs[0].set_ylim([-0.1,0.2])
axs[0].set_xticks([])
axs[0].set_xticklabels([])
axs[0].legend(loc="upper right")
ax2.set_ylim([None, None])
ax2.set_ylabel(r'$\lambda_t$')
ax2.yaxis.set_major_formatter(FuncFormatter(thousand_formatter))
axs[1].plot(t_aci, distances["ACI"], color=colors["ACI"])
axs[1].plot(t_conformal, distances["Conformal Controller"], color=colors["Conformal Controller"])
axs[1].set_yticks([0, 10])
axs[1].set_xlabel('time (s)')
axs[1].set_ylabel('min dist (m)')
axs[1].axhline(y=params["human_rad"], linestyle="dotted", color="#888888")
plt.tight_layout()
plt.savefig('./teaser/sdd_lineplots.pdf', dpi=frame.shape[0], bbox_inches='tight', pad_inches=0)