# Simulation

In [1]:
from stable_baselines3 import TD3,SAC
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv, VecNormalize
from stable_baselines3.common.env_util import make_vec_env

import gymnasium as gym
from gymnasium.wrappers import RescaleAction
import torch
import nav2d        # Have to import the nav2d Python script, else we can't make env
import numpy as np
import os, re, json, time
from datetime import datetime
from tqdm import tqdm

import pyautogui
import matplotlib.pyplot as plt

In [2]:
def eval_policy(env: gym.Env, 
         num_evals: int, 
         model):

        max_eps_length = env.spec.max_episode_steps
        max_env_size = np.sqrt(2* (2*env.unwrapped.size)**2)

        success_list = [0]*num_evals
        ep_len_list = [0]*num_evals
        final_dist_list = [0]*num_evals

        
        # for each episode in the num_evals:
        for ep in tqdm(range(num_evals), ncols = 100, colour = "#33FF00", desc = f"Evaluating..."):
            done = False

            # initialize episodic reward:
            ep_len = 0

            # while False:
            while not done:
                # get action and step:
                with torch.no_grad():
                    action, _ = model.predict(obs, deterministic = True)
                    nobs, reward, term, trunc, info = env.step(action)
                    done = term or trunc
                    
                    ep_len += 1

                    # advance observation, reset if not:
                    if done:
                        success_list[ep] = info.get("is_success",False)
                        ep_len_list[ep] = ep_len
                        final_dist_list[ep] = np.sqrt(nobs[0]**2+nobs[1]**2)
                        obs = env.reset()[0]
                    else:
                        obs = nobs

        return success_list, np.mean(ep_len_list)/max_eps_length, np.mean(final_dist_list)/max_env_size

Select the result and run to simulate

In [3]:
result_dir = os.getcwd()
result_num = "result_00000"
run_num = "run_100"
result_path = os.path.join(result_dir, "results", "Nav2D_TD3_SB3_optuna_results", result_num)
run_path = os.path.join(result_path, run_num)
model_load = TD3.load(run_path)

Simulation parameters

In [4]:
# testing parameters
n_test = 20
success_count = 0

# environment options
width = 400
height = 400
default_camera_config = {"azimuth" : 90.0, "elevation" : -90.0, "distance" : 3, "lookat" : [0.0, 0.0, 0.0]}
render_mode = "human" if n_test<=20 else "rgb_array"
camera_id = 2

DEFAULT_CAMERA = "overhead_camera"
ENABLE_FRAME = True                     # enable the body frames
RENDER_EVERY_FRAME = True              # similar sim speed as MuJoCo rendering when set to False, else slower

In [71]:
import stable_baselines3
test_env = gym.make("Nav2D-v0", render_mode=render_mode, 
                    width=width,height=height,
                    default_camera_config=default_camera_config,
                    camera_id=camera_id,
                    max_episode_steps=1_000,
                    is_eval=False
                    )
test_env_vec = DummyVecEnv([lambda: test_env])
test_env_norm = VecNormalize(test_env_vec, training=True, norm_obs=True, norm_reward=True)

# obs = test_env.reset()
# test_env.close()

isinstance(test_env_norm, (DummyVecEnv, SubprocVecEnv, VecNormalize))

# test_env.spec.max_episode_steps
# test_env_vec.envs[0].spec.max_episode_steps
# test_env_vec.envs[0].unwrapped.size
# success, ep_len_mean, final_dist = eval_policy(env=test_env, num_evals=n_test, model=model_load)stable_baselines3.

True

In [67]:
test_env_norm

<stable_baselines3.common.vec_env.vec_normalize.VecNormalize at 0x7119bd84e410>

Simulation

In [None]:
test_env = gym.make("Nav2D-v0", render_mode=render_mode, 
                    width=width,height=height,
                    default_camera_config=default_camera_config,
                    camera_id=camera_id,
                    max_episode_steps=1_000,
                    is_eval=False
                    )
obs, info = test_env.reset()

core_env = test_env.unwrapped
rew_goal = core_env.rew_goal_scale

agent_init_list = []
rew_head_list = []

for eps in tqdm(range(n_test), ncols = 100, colour = "#33FF00", desc = f"Evaluating..."):
    # if eps == 0:
    #     if DEFAULT_CAMERA=="overhead_camera": pyautogui.press('tab')
    #     if ENABLE_FRAME: pyautogui.press('e') 
    #     if not RENDER_EVERY_FRAME: pyautogui.press('d')
    done = False
    while not done:
        action, _ = model_load.predict(obs, deterministic=True)
        # print(f"{action}           ", end='\r')
        nobs, rew, term, trunc, info = test_env.step(action)
        # if render_mode == "human":  # visual
        #     print(f"action: {action} | rew_appr: {info.get('rew_approach',-10.0):10.6f}                      ", end="\r")
        done = term or trunc

        if not done:
            obs = nobs
        else: 
            obs, info = test_env.reset()
            agent_init_list.append(info["agent_init"])
            rew_head_list.append(info["rew_head"])
            

        # --- count the success
        if rew == rew_goal: success_count += 1  

print(f"\rSuccess rate out of {n_test} runs is {success_count/n_test*100:.2f}%             ")
test_env.close()

# Explore the result

In [None]:
from statistics import mean, stdev

## Spawn Location Distribution

Unpack the list of x and y initial location

In [None]:
x_list, y_list = [i[0] for i in agent_init_list], [i[1] for i in agent_init_list]
theta_list = [i[2] for i in agent_init_list]

Distribution (mean,stdev) along the x axis

In [None]:
mean(x_list), stdev(x_list)

Distribution (mean,stdev) along the y axis

In [None]:
mean(y_list), stdev(y_list)

Distribution (mean,stdev) about the z axis

In [None]:
mean(theta_list), stdev(theta_list)

## Heading Reward distribution

Inspect the headding reward over the runs

In [None]:
print(f"Heading reward           μ = {mean(rew_head_list): 6.5f}, σ={stdev(rew_head_list): 6.5f}")

Explore the reward received in each state space regions (discretized)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

size = core_env.size
n_bins = 50
grid_count = np.zeros((n_bins, n_bins), dtype=int)
grid_reward = np.zeros((n_bins, n_bins), dtype=np.float64)

for idx, agent_init in enumerate(agent_init_list):
    
    x, y, theta = agent_init
    
    ix = int((x+size)/(2*size) * (n_bins))
    iy = int((y+size)/(2*size) * (n_bins))

    # print(agent_init, ix, iy)

    if ix >= 100 or iy >= 100:
        print(x, y, ix, iy)
    grid_count[ix, iy] += 1
    count = grid_count[ix, iy]
    alpha = (count-1)/count
    grid_reward[ix, iy] = alpha * grid_reward[ix, iy] + (1-alpha) * rew_head_list[idx]
    
tick_labels = np.round(np.linspace(-size, size ,n_bins),1)
# print(grid)
fig, axes = plt.subplots(1,2, figsize=(20,8))
axes[0] = sns.heatmap(grid_count.T, ax=axes[0], cmap = 'plasma', 
                      xticklabels=tick_labels,yticklabels=tick_labels)
axes[0].invert_yaxis()
# axes[0].invert_xaxis()
axes[0].set_aspect('equal')
axes[0].set_title(f'Agent spawn frequency in {result_num}')
axes[0].set_xlabel('x')
axes[0].set_ylabel('y')

plt.show 

axes[1] = sns.heatmap(grid_reward.T, ax=axes[1], cmap = 'plasma',
                      xticklabels=tick_labels, yticklabels=tick_labels)
axes[1].invert_yaxis()
# axes[1].invert_xaxis()
axes[1].set_aspect('equal')
axes[1].set_title(f'Average reward in {result_num}')
plt.show

## Spawn Frequency distribution

In [None]:
spawn_freq_mean = grid_count.mean()
spawn_freq_stdev = grid_count.std()

print(spawn_freq_mean, spawn_freq_stdev)

# Visualize the spawn reward in a video

In [None]:
import cv2
import os

result_num = "result_00060"
video_name = os.path.join(os.getcwd(),"results","Nav2D_TD3_SB3_results",result_num,f"{result_num}.avi")
image_folder = os.path.join(os.getcwd(), "results", "Nav2D_TD3_SB3_results", result_num)
images = [img for img in os.listdir(image_folder) if img.endswith(".png")]

frame = cv2.imread(os.path.join(image_folder, images[0]))
height, width, layers = frame.shape

video = cv2.VideoWriter(video_name, cv2.VideoWriter_fourcc(*'XVID'), 5, (width,height)) 

images.sort()

for image in images:
    img_path = os.path.join(image_folder, image)
    frame = cv2.imread(img_path)
    video.write(frame)

video.release()
cv2.destroyAllWindows()


In [None]:
import optuna
import pandas

study = optuna.study.load_study(study_name="rew_scale_nov24", storage="sqlite:///results/optuna_study.db")
study_df = study.trials_dataframe()
study_df.to_csv("results/rew_scale_nov24")

In [3]:
study_df

<bound method Study.trials_dataframe of <optuna.study.study.Study object at 0x7170bc339ff0>>