In [1]:
from gymnasium.envs.registration import register
import gymnasium as gym
import time
import minigrid
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from minigrid.wrappers import FlatObsWrapper
import numpy as np
from minigrid.core.world_object import Goal
import random
from minigrid.core.constants import OBJECT_TO_IDX, IDX_TO_OBJECT, COLOR_TO_IDX, IDX_TO_COLOR, DIR_TO_VEC
import tensorflow as tf


pygame 2.1.0 (SDL 2.0.16, Python 3.10.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


  from pandas.core import (


In [2]:
register(
    id='custom_empty-v0',
    entry_point='emptyrandom:RandomGoalEmptyEnv',
    kwargs={}
)

In [3]:
#gym.pprint_registry()  # to see all registered environments

if 'custom_empty-v0' in gym.envs.registry:
    print("Environment 'custom_empty-v0' is registered.")
else:
    print("Environment 'custom_empty-v0' is NOT registered.")

Environment 'custom_empty-v0' is registered.


Check what agent is actually seeing

If the environement is correctly initialized

In [4]:
# env = gym.make('custom_empty-v0',size=8,render_mode="human")

In [5]:
# def print_observation_details(self, obs):
#     # Assuming obs is a dictionary with 'image' as key holding the grid data
#     grid_obs = obs.get('image')
#     if grid_obs is not None:
#         # Print entire grid observation or specific elements
#         print("Grid Observation at current step:")
#         print(grid_obs)
#         # Optionally, check for specific elements like the goal
#         if (grid_obs == OBJECT_TO_IDX['goal']).any():
#             print("Goal is visible in the observation.")
#         else:
#             print("Goal is not visible in the observation.")

In [6]:
# initial_observation = env.reset()
# print("Initial Observation:", initial_observation)

In [7]:
# for steps in range(100000): 
#     action = env.action_space.sample()  
#     observation, reward, done,info,extra = env.step(action)  
#     #time.sleep(0.5)
#     #print_observation_details(observation)
#     #print(f"Action: {action}, Observation: {observation}, Reward: {reward}, Done: {done}")
#     # Print the action taken and the resulting observation
#     print(f"\nStep {steps + 1}")
#     print(f"Action taken: {action}")
#     print("Observation:", observation)
#     print("Reward:", reward)
#     print("Done:", done)
#     print("Info:", info)
#     print("Extra:", extra)  # Print the additional tuple element
#     env.render()  
#     if done:
#         break  
# env.close()

In [8]:
# print("Observation space:", env.observation_space)

In [9]:
# observation = env.reset()
# print("Initial observation:", observation)

# action = env.action_space.sample()  # Take a random action
# observation, _, _, _,_ = env.step(action)
# print("Observation after one action:", observation)

Understanding the Observation Structure

'image' Array: This appears to be a grid representation of the agent's surroundings, where each tile is described by three values:

OBJECT_IDX: Integer representing the type of object (e.g., wall, floor, empty space).

COLOR_IDX: Integer representing the color of the object.

STATE: Integer representing the state of the object (e.g., for doors whether they are open, closed, or locked).

'direction' Key: This indicates the direction the agent is currently facing. The direction might be encoded as integers (0-3), each representing a cardinal direction.

'mission' String: This provides a textual description of the agent's current objective, which in this case is 'get to the green goal square'.


In [6]:
env_id = "custom_empty-v0"

In [7]:
class ActionWrapper(gym.ActionWrapper):
    def __init__(self, env):
        super(ActionWrapper, self).__init__(env)
        # Define a new action space with only the relevant actions
        self.action_space = gym.spaces.Discrete(3)  # Only three actions: left, right, forward

    def action(self, action):
        # Map the new actions to the original actions
        action_mapping = {
            0: 0,  # left
            1: 1,  # right
            2: 2   # forward
        }
        return action_mapping[action]

In [8]:
class RewardShapingWrapper(gym.Wrapper):
    def __init__(self, env):
        super(RewardShapingWrapper, self).__init__(env)
        self.last_action = None
        self.spin_counter = 0  # Tracks consecutive left-right turns
        self.last_distance = None
        self.goal_pos = self.get_goal_position()  # Method to determine goal position

    def step(self, action):
        obs, reward, done, info,extra = self.env.step(action)
        #print(f"Observation: {obs}")
        current_pos = self.env.agent_pos
        
        # Calculate distance to the goal
        if self.last_distance is None:
            self.last_distance = self.calculate_distance(current_pos, self.goal_pos)
            #print("distance to goal",self.last_distance)

        new_distance = self.calculate_distance(current_pos, self.goal_pos)
        if new_distance < self.last_distance:
            # Reward for moving closer to the goal
            reward += 5 * (self.last_distance - new_distance)
        self.last_distance = new_distance

        if action == 2 and self.is_facing_wall():
            reward -= 10  # Penalty for trying to move into a wall

        # Check for spinning behavior
        if self.last_action in [0, 1] and action in [0, 1] and action != self.last_action:
            self.spin_counter += 1
        else:
            self.spin_counter = 0

        if self.spin_counter > 2:  # Threshold for considering it spinning
            reward -= 10
            self.spin_counter = 0  # Reset counter after penalty

        #dditional logic to discourage spinning or other suboptimal behaviors
        if self.last_action == 0 and action == 1 or self.last_action == 1 and action == 0:
            reward -= 10  # Increase penalty for oscillating between left and right

        # High terminal reward when reaching the goal
        if np.array_equal(current_pos, self.goal_pos):
            reward += 500  # High reward for reaching the goal
            done = True

        self.last_action = action
        return obs, reward, done, info,extra

    
    def reset(self, **kwargs):  # Accept any additional keyword arguments
        obs = self.env.reset(**kwargs)  # Reset the underlying environment
        self.goal_pos = self.env.get_goal_position()  # Retrieve the updated goal position
        self.last_distance = None
        self.last_action = None
        #self.spin_counter = 0
        return obs  # Return the observation

    def calculate_distance(self, current_pos, goal_pos):
        if current_pos is None or goal_pos is None:
            return float('inf')  # Return a large distance if any position is not initialized
        dist_to_goal = np.linalg.norm(np.array(current_pos) - np.array(goal_pos))
        #print('distance to goal:', dist_to_goal)
        return dist_to_goal

    def is_facing_wall(self):
        x, y = self.env.agent_pos
        direction_idx = self.env.agent_dir
        delta = DIR_TO_VEC[direction_idx]
        next_x, next_y = x + delta[0], y + delta[1]
        if 0 <= next_x < self.env.width and 0 <= next_y < self.env.height:
            next_cell = self.env.grid.get(next_x, next_y)
            #print("Object to Index Mapping:", OBJECT_TO_IDX)
            #print("Index for goal:", OBJECT_TO_IDX['goal'])
            #print("Index for wall:", OBJECT_TO_IDX['wall'])
            return next_cell and next_cell.type == OBJECT_TO_IDX['wall']
        return False



In [9]:
def create_env():
    env = gym.make(env_id, render_mode="rgb_array",size=19)
    env = ActionWrapper(env)
    env = RewardShapingWrapper(env)
    env = FlatObsWrapper(env)  
    return env

# Use the function in make_vec_env
env_vec = make_vec_env(create_env, n_envs=1)
env_vec2 = make_vec_env(create_env, n_envs=4)

  logger.warn(


In [10]:
# Initialize the DQN model
policy_kwargs = dict(net_arch=[32,32,32])
model = DQN("MlpPolicy", env_vec2, verbose=1,
            buffer_size=50000,
            learning_rate=1e-5,
            batch_size=32,
            exploration_fraction=0.1,
            exploration_final_eps = 0.1,
            gamma=0.999,
            policy_kwargs=policy_kwargs,
            learning_starts=2000,
            gradient_steps=1,
            target_update_interval=10000,
            tau=1.0
            

            )

# Train the model
model.learn(total_timesteps=20000)

Using cpu device
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 100      |
|    ep_rew_mean      | -240     |
|    exploration_rate | 0.82     |
| time/               |          |
|    episodes         | 4        |
|    fps              | 3762     |
|    time_elapsed     | 0        |
|    total_timesteps  | 400      |
----------------------------------


  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(


----------------------------------
| rollout/            |          |
|    ep_len_mean      | 98.1     |
|    ep_rew_mean      | -148     |
|    exploration_rate | 0.64     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 3824     |
|    time_elapsed     | 0        |
|    total_timesteps  | 800      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 98.8     |
|    ep_rew_mean      | -172     |
|    exploration_rate | 0.46     |
| time/               |          |
|    episodes         | 12       |
|    fps              | 3879     |
|    time_elapsed     | 0        |
|    total_timesteps  | 1200     |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 99.1     |
|    ep_rew_mean      | -174     |
|    exploration_rate | 0.28     |
| time/               |          |
|    episodes       

KeyboardInterrupt: 

In [19]:
model.save("dqn_minigrid_empty_random")

In [20]:
model = DQN.load("dqn_minigrid_empty",env=env_vec)

# Evaluate the model
mean_reward, std_reward = evaluate_policy(model, env=env_vec, n_eval_episodes=10)
print(f"Mean Reward: {mean_reward} +/- {std_reward}")

Mean Reward: -1148.917549 +/- 78.08387260938177


In [21]:
# Easy seed=1, medium seed=51, hard seed=8. You can set them during this way. env.reset(seed=1)
#env_vec = env_vec.reset(seed=1)
env_vec_test = make_vec_env(create_env, n_envs=1,seed=51)

num_episodes = 25
total_rewards = []  # List to store total rewards for each episode

for episode in range(num_episodes):
    obs = env_vec_test.reset()
    done = False
    total_reward = 0
    while not done:
        action, _ = model.predict(obs)
        print('action', action)
        obs, reward, done, info = env_vec_test.step(action)
        print('obs', obs)
        total_reward += reward
        env_vec_test.render('human')  # Render the environment at each step
        time.sleep(0.05)  # Adjust this to control the speed of the rendering
    total_rewards.append(total_reward)  # Store the total reward for this episode
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")

# Calculate the average reward across all episodes
average_reward = sum(total_rewards) / num_episodes
print(f"Average Reward over {num_episodes} episodes: {average_reward}")

env_vec_test.close()  # Close the environment when done

action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [2]
obs [[2 5 0 ... 0 0 0]]
action [0]
obs [[2 5 0 ... 0 0 0]]
action [1]
obs [[2 5 0 ... 0 0 0]]
action [0]
obs [[2 5 0 ... 0 0 0]]
action [1]
obs [[2 5 0 ... 0 0 0]]
action [0]
obs [[2 5 0 ... 0 0 0]]
action [1]
obs [[2 5 0 ... 0 0 0]]
action [0]
obs [[2 5 0 ... 0 0 0]]
action [1]
obs [[2 5 0 ... 0 0 0]]
action [0]
obs [[2 5 0 ... 0 0 0]]
action [1]
obs [[2 5 0 ... 0 0 0]]
action [0]
obs [[2 5 0 ... 0 0 0]]
action [1]
obs [[2 5 0 ... 0 0 0]]
action [0]
obs [[2 5 0 ... 0 0 0]]
action [1]
obs [[2 5 0 ... 0 0 0]]
action [0]
obs [[2 5 0 ... 0 0 0]]
action [1]
obs [[2 5