In [109]:
# Our custom module
from nn import DeepQNetwork
from custom_parking_env import ParkingWithObstacles

# python module
import gymnasium as gym
import torch
import numpy as np

# if GPU is to be used
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using CUDA")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Using MPS")
else:
    device = torch.device("cpu")
    print("Using CPU")
    
policy_net_weights = torch.load('policy_net.pth', weights_only=False)
device = torch.device(device)

Using CPU


In [110]:
candidate_actions = []

for steering in np.linspace(-0.5, 0.5, 11):
        for acceleration in np.linspace(0.8, 0.4, 4):
            candidate_actions.append(torch.Tensor([acceleration, steering]))
            
# print(candidate_actions)

In [111]:
# combine observation
def process_observation(observation):
    # observation_vector = np.concatenate((
    #     observation["observation"],
    #     observation["achieved_goal"],
    #     observation["desired_goal"]
    # ))
    return observation["observation"]

# Create the original environment and wrap it into an environment with obstacles
env_origin = gym.make("parking-v0", render_mode="human")
env = ParkingWithObstacles(env_origin)
env.define_spaces()

print(env.config)

# Get the number of state observations
state, info = env.reset()

# Get number of actions from gym action space
n_actions = len(candidate_actions)
n_observations = len(process_observation(state))  # 6 (observation) 

print(env.observation_space)

{'observation': {'type': 'KinematicsGoal', 'features': ['x', 'y', 'vx', 'vy', 'cos_h', 'sin_h'], 'scales': [100, 100, 5, 5, 1, 1], 'normalize': True}, 'action': {'type': 'ContinuousAction'}, 'simulation_frequency': 15, 'policy_frequency': 5, 'other_vehicles_type': 'highway_env.vehicle.behavior.IDMVehicle', 'screen_width': 1200, 'screen_height': 600, 'centering_position': [0.5, 0.5], 'scaling': 9, 'show_trajectories': False, 'render_agent': True, 'offscreen_rendering': False, 'manual_control': False, 'real_time_rendering': False, 'reward_weights': [0.5, 0.5, 0, 0, 0.02, 0.02], 'success_goal_reward': 0.15, 'collision_reward': -5, 'steering_range': 0.7853981633974483, 'duration': 50, 'controlled_vehicles': 1, 'vehicles_count': 0, 'add_walls': False, 'add_obstacles': True, 'obstacles_count': 3}
Dict('achieved_goal': Box(-inf, inf, (6,), float64), 'desired_goal': Box(-inf, inf, (6,), float64), 'observation': Box(-inf, inf, (6,), float64))


In [112]:
model = DeepQNetwork(n_observations,n_actions).to(device)
# Testing
# Load the model weights
model.load_state_dict(policy_net_weights)

<All keys matched successfully>

In [113]:
def test(env, model, num_episodes=100):
    model.eval()  # Set the model to evaluation mode
    total_test_rewards = []
    episode_durations = []
    
    def to_tensor(vector):
        return torch.tensor(vector, dtype=torch.float32, device=device).unsqueeze(0)

    for i_episode in range(num_episodes):
        observation, info = env.reset()
        observation_tensor = to_tensor(process_observation(observation))
        
        total_reward = 0
        index = 0
        print(f"Demo Episode {i_episode + 1} started...")

        while True:
            # print(env.time)
            with torch.no_grad():
                action_index = model(observation_tensor).max(1)[1].view(1, 1)

            try:
                action = candidate_actions[action_index.item()]
            except IndexError:
                print(f"Invalid action index: {action_index.item()}")
                break

            observation, reward, done, truncated, info = env.step(action.numpy())
            total_reward += reward

            if done or truncated:
                print(f"Done: {done}, Truncated: {truncated}")
                print(info)
                print(f"Episode {i_episode + 1} ended. Total Reward: {total_reward}")
                break

            observation_tensor = to_tensor(process_observation(observation))
            index += 1

        episode_durations.append(index + 1)
        total_test_rewards.append(total_reward)
        print(f"Progress: {i_episode + 1}/{num_episodes}")

    env.close()
    avg_reward = np.mean(total_test_rewards)
    avg_duration = np.mean(episode_durations)
    print(f"Test completed over {num_episodes} episodes.")
    print(f"Average reward: {avg_reward:.2f}, Average duration: {avg_duration:.2f} steps.")
    print(f"Max reward: {np.max(total_test_rewards):.2f}, Min reward: {np.min(total_test_rewards):.2f}")


test(env, model, num_episodes=10)

Demo Episode 1 started...
Done: True, Truncated: False
{'speed': 22.666667342185956, 'crashed': True, 'action': array([ 0.6666667, -0.5      ], dtype=float32), 'is_success': False}
Episode 1 ended. Total Reward: -22.068151634551086
Progress: 1/10
Demo Episode 2 started...
Done: True, Truncated: False
{'speed': 6.581728591212516, 'crashed': True, 'action': array([ 0.6666667, -0.5      ], dtype=float32), 'is_success': False}
Episode 2 ended. Total Reward: -9.851301345913953
Progress: 2/10
Demo Episode 3 started...
Done: True, Truncated: False
{'speed': 20.32592653168571, 'crashed': True, 'action': array([ 0.6666667, -0.5      ], dtype=float32), 'is_success': False}
Episode 3 ended. Total Reward: -17.31671001588114
Progress: 3/10
Demo Episode 4 started...
Done: True, Truncated: False
{'speed': 6.000987833164354, 'crashed': True, 'action': array([ 0.6666667, -0.5      ], dtype=float32), 'is_success': False}
Episode 4 ended. Total Reward: -8.769365967975284
Progress: 4/10
Demo Episode 5 sta

In [114]:
env.close()