In [1]:
# Simulator packages
from environment import CarEnv
from train_mlp import reward_fn, social_reward_fn
from stable_baselines3 import SAC
import pygame

# Data analysis packages
import numpy as np
import pandas as pd

# OS packages
import os
import glob
from shutil import copyfile

pygame 2.0.1 (SDL 2.0.14, Python 3.7.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


### Specify the location (folder) containing the models to be evaluated and initialize data container

In [2]:
NUM_EVAL_EPISODES = 100
MAX_LEN = 300   # Store pededestrian and car position for a maximum of MAX_LEN steps

training_instance = "TRAINING_20000"
folder_name = os.path.join("./logs/", training_instance)
model_names = [os.path.basename(el) for el in glob.glob(os.path.join(folder_name, "svo_*"))]

print(model_names)

dtypes = np.dtype([
    ("goal_reached", bool),
    ("collisions", bool),
    ("is_crossing", bool),
    ("is_aware", bool),
    ("ep_length", np.uint32),
    ("time_to_reach_goal", np.float32),
    ("collision_speed", np.float32)
])

data = {os.path.basename(el): {"path": el, 
                            "num_episodes": NUM_EVAL_EPISODES,
                            "episodes": pd.DataFrame(np.zeros(shape=(NUM_EVAL_EPISODES,), dtype=dtypes))} 
        for el in glob.glob(os.path.join(folder_name, "svo_*"))}

for el in data:
    data[el]["episodes"]["acceleration"] = [np.zeros(shape=(MAX_LEN), dtype=np.float32) for _ in range(NUM_EVAL_EPISODES)]
    data[el]["episodes"]["ped_pos"] = [np.zeros(shape=(MAX_LEN, 2), dtype=np.float32) for _ in range(NUM_EVAL_EPISODES)]
    data[el]["episodes"]["car_pos"] = [np.zeros(shape=(MAX_LEN, 2), dtype=np.float32) for _ in range(NUM_EVAL_EPISODES)]
    #data[el]["episodes"]["jerk"] = [np.zeros(shape=(MAX_LEN), dtype=np.float32) for _ in range(NUM_EVAL_EPISODES)]

['svo_00', 'svo_10', 'svo_20', 'svo_30', 'svo_40', 'svo_50', 'svo_60', 'svo_70', 'svo_80']


### Specify destination folder

In [3]:
if not os.path.isdir("./results/"):
    os.mkdir("./results/")

dst_folder = os.path.join("./results/", training_instance + "_eval2")
print(dst_folder)

if not os.path.isdir(dst_folder):
    os.mkdir(dst_folder)
    
copyfile(os.path.join(folder_name, "metadata.json"), os.path.join(dst_folder, "metadata.json"));

./results/TRAINING_20000_eval2


# Evaluate each model

### Pre-spawn pedestrians

In [4]:
# Create a dummy environment to get spawn locations with CarEnv methods
helper_env = CarEnv(window_size = (800, 400), 
                     record_video=False,
                     render=False, 
                     reward_fn=social_reward_fn,
                     pedestrian_model='SGSFM')

percentage_crossing = 0.9      # Percentage of pedestrians that will cross the road
awareness_probability = 0.9    # Probability that a pedestrian is aware of the vehicle
p_bottom = 0.5                 # Probability that the pedestrian spawns in the bottom pavement

ped_dtypes = np.dtype([
    ("is_crossing", bool),
    ("is_aware", bool),
    ("is_bottom", bool),
    ("spawn_x", np.float32),
    ("spawn_y", np.float32),
    ("goal_x", np.float32),
    ("goal_y", np.float32)
])

ped = pd.DataFrame(np.zeros(shape=(NUM_EVAL_EPISODES,), dtype=ped_dtypes))
ped["is_crossing"] = [np.random.binomial(1, percentage_crossing)==1 for _ in range(NUM_EVAL_EPISODES)]
ped["is_aware"] = [np.random.binomial(1, awareness_probability)==1 for _ in range(NUM_EVAL_EPISODES)]
ped["is_bottom"] = [np.random.binomial(1, p_bottom)==1 for _ in range(NUM_EVAL_EPISODES)]

for i in range(NUM_EVAL_EPISODES):
    s_x, s_y, g_x, g_y = helper_env.get_random_pedestrian_spawn_info(ped.loc[i, "is_crossing"], 
                                                                      ped.loc[i, "is_bottom"])
    ped.loc[i, "spawn_x"] = s_x
    ped.loc[i, "spawn_y"] = s_y
    ped.loc[i, "goal_x"] = g_x
    ped.loc[i, "goal_y"] = g_y

Pedestrian awareness probability set to: 1


### Pre-spawn vehicle

In [5]:
veh_dtypes = np.dtype([
    ("spawn_x", np.float32),
    ("spawn_y", np.float32),
    ("initial_velocity", np.float32)
])

veh = pd.DataFrame(np.zeros(shape=(NUM_EVAL_EPISODES,), dtype=veh_dtypes))

for i in range(NUM_EVAL_EPISODES):
    x_0, y_0, v_0 = helper_env.get_random_vehicle_spawn_info()
    veh.loc[i, "spawn_x"] = x_0
    veh.loc[i, "spawn_y"] = y_0
    veh.loc[i, "initial_velocity"] = v_0

### Evaluation


In [6]:
for model_name in model_names:
    print(f"Evaluating {model_name}")
    model = data[model_name]
    episodes = model["episodes"]
    
    env = CarEnv(window_size = (800, 400), 
                 record_video=False,
                 render=False, 
                 reward_fn=social_reward_fn,
                 training=False,
                 pedestrian_model='SGSFM',
                 p_aware=awareness_probability)
    
    nn_path = os.path.join(model["path"], "best_model.zip")
    nn = SAC.load(nn_path)
    
    for i in range(NUM_EVAL_EPISODES):
        veh_info = veh.loc[i]
        ped_info = ped.loc[i]
        
        obs = env.reset(pedestrian_info=ped_info, vehicle_info=veh_info)
        counter = 0
        done = False
                
        while not done:
            env.render()
            action, _ = nn.predict(obs, deterministic=True)
            obs, reward, done, _ = env.step(action)
            
            if counter < MAX_LEN:
                episodes.loc[i, "acceleration"][counter] = env.vehicle.get_acceleration()[0]
                episodes.loc[i, "car_pos"][counter] = env.vehicle.position
                episodes.loc[i, "ped_pos"][counter] = env.pedestrian.position
                counter += 1
            
            if done:
                episodes.loc[i, "collisions"] = env.collision_occurred
                episodes.loc[i, "goal_reached"] = env.vehicle.goal_reached
                episodes.loc[i, "ep_length"] = env.timestep
                
                if env.vehicle.goal_reached:
                    episodes.loc[i, "time_to_reach_goal"] = env.timestep * env.dt
                    
                if env.collision_occurred:
                    car_speed = env.vehicle.velocity
                    ped_speed = env.pedestrian.velocity
                    rel_speed = car_speed - ped_speed
                    episodes.loc[i, "collision_speed"] = np.linalg.norm(rel_speed)
                    
                episodes.loc[i, "is_aware"] = ped_info["is_aware"]
                episodes.loc[i, "is_crossing"] = ped_info["is_crossing"]

Evaluating svo_00
Pedestrian awareness probability set to: 0.9
Evaluating svo_10
Pedestrian awareness probability set to: 0.9
Evaluating svo_20
Pedestrian awareness probability set to: 0.9
Evaluating svo_30
Pedestrian awareness probability set to: 0.9
Evaluating svo_40
Pedestrian awareness probability set to: 0.9
Evaluating svo_50
Pedestrian awareness probability set to: 0.9
Evaluating svo_60
Pedestrian awareness probability set to: 0.9
Evaluating svo_70
Pedestrian awareness probability set to: 0.9
Evaluating svo_80
Pedestrian awareness probability set to: 0.9


### Save Results

In [7]:
for model_name in model_names:
    df = data[model_name]["episodes"]
    df.to_pickle(os.path.join(dst_folder, f"{model_name}.pkl"))
    
ped.to_pickle(os.path.join(dst_folder, "pedestrian_info.pkl"))
veh.to_pickle(os.path.join(dst_folder, "vehicle_info.pkl"))

In [8]:
print(f"Saving results to {dst_folder}")

Saving results to ./results/TRAINING_20000_eval2
