In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import random

class TreasureHuntEnv(gym.Env):
    def __init__(self, grid_size=10, max_steps=100):
        super(TreasureHuntEnv, self).__init__()
        
        # Grid Configuration
        self.grid_size = grid_size
        self.max_steps = max_steps
        
        # Action space: 4 discrete actions (up, down, left, right)
        self.action_space = spaces.Discrete(4)
        
        # Observation space: Grid flattened into a single vector
        self.observation_space = spaces.Box(
            low=0, high=3, shape=(grid_size, grid_size), dtype=np.int32
        )
        
        # Rewards
        self.reward_treasure = 10
        self.reward_trap = -5
        self.reward_exit = 50
        self.step_penalty = -1

        # Initialize the environment
        self.reset()
    
    def reset(self, seed=None, options=None):
        # Reset step counter
        self.steps = 0
        
        # Create a new grid
        self.grid = np.zeros((self.grid_size, self.grid_size), dtype=np.int32)
        self.grid[:, :] = 0  # Empty spaces
        
        # Place treasures (value = 1)
        for _ in range(10):
            x, y = random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)
            self.grid[x, y] = 1
        
        # Place traps (value = 2)
        for _ in range(10):
            x, y = random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)
            if self.grid[x, y] == 0:  # Ensure no overlap
                self.grid[x, y] = 2

        # Place exit (value = 3)
        self.grid[self.grid_size - 1, self.grid_size - 1] = 3
        
        # Player's starting position
        self.player_pos = [0, 0]
        
        # Observation: Initial grid state
        observation = self._get_observation()
        return observation, {}
    
    def step(self, action):
        self.steps += 1
        
        # Move player based on the action
        if action == 0 and self.player_pos[0] > 0:  # Up
            self.player_pos[0] -= 1
        elif action == 1 and self.player_pos[0] < self.grid_size - 1:  # Down
            self.player_pos[0] += 1
        elif action == 2 and self.player_pos[1] > 0:  # Left
            self.player_pos[1] -= 1
        elif action == 3 and self.player_pos[1] < self.grid_size - 1:  # Right
            self.player_pos[1] += 1
        
        # Calculate reward
        current_cell = self.grid[self.player_pos[0], self.player_pos[1]]
        reward = self.step_penalty  # Default step penalty
        
        if current_cell == 1:  # Treasure
            reward += self.reward_treasure
            self.grid[self.player_pos[0], self.player_pos[1]] = 0  # Remove treasure
        elif current_cell == 2:  # Trap
            reward += self.reward_trap
        elif current_cell == 3:  # Exit
            reward += self.reward_exit
            done = True
            return self._get_observation(), reward, done, False, {}
        
        # Check termination
        done = False
        if self.steps >= self.max_steps:  # Step limit reached
            done = True
        
        return self._get_observation(), reward, done, False, {}
    
    def render(self):
        # Simple text-based rendering
        print("\nGrid:")
        for row in range(self.grid_size):
            line = ""
            for col in range(self.grid_size):
                if self.player_pos == [row, col]:
                    line += "P "  # Player's position
                elif self.grid[row, col] == 1:
                    line += "T "  # Treasure
                elif self.grid[row, col] == 2:
                    line += "X "  # Trap
                elif self.grid[row, col] == 3:
                    line += "E "  # Exit
                else:
                    line += "0 "  # Empty space
            print(line)
    
    def _get_observation(self):
        # Flattened grid representation with player position
        obs = np.copy(self.grid)
        #obs[self.player_pos[0], self.player_pos[1]] = 9  # Mark player position
        return obs

In [2]:
from stable_baselines3.common.env_checker import check_env

# Check if the environment is valid
env = TreasureHuntEnv(grid_size=5, max_steps=50)
check_env(env, warn=True)



In [3]:
from stable_baselines3 import DQN
from stable_baselines3.common.env_checker import check_env

# Check if the environment is valid
env = TreasureHuntEnv(grid_size=5, max_steps=50)
check_env(env, warn=True)

# Initialize the DQN model
#model = DQN("MlpPolicy", env, verbose=1, tensorboard_log="./dqn_treasure_hunt/")
#model = DQN("MlpPolicy", env, verbose=1, tensorboard_log="/Users/eshajaiswal/Library/CloudStorage/OneDrive-Personal/Esha THM Original/THM 5.Sem/5. KI/DungeonProjectRFL/dqn_treasure_hunt/")
model = DQN("MlpPolicy", env, verbose=1)

# Train the model
model.learn(total_timesteps=10000)

# Save the trained model
model.save("dqn_treasure_hunt")

# Test the trained model
obs, _ = env.reset()
done = False

while not done:
    env.render()
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = env.step(action)
    print(f"Action: {action}, Reward: {reward}")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 42.5     |
|    ep_rew_mean      | -10      |
|    exploration_rate | 0.838    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 1147     |
|    time_elapsed     | 0        |
|    total_timesteps  | 170      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 2.35     |
|    n_updates        | 17       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 40.2     |
|    ep_rew_mean      | 1        |
|    exploration_rate | 0.694    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 1557     |
|    time_elapsed     | 0        |
|    total_timesteps  | 322      |
| train/              |        

In [4]:
import gymnasium as gym
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
import numpy as np
import random

# Define the TreasureHunt Environment
class TreasureHuntEnv(gym.Env):
    def __init__(self, grid_size=10, max_steps=100):
        super(TreasureHuntEnv, self).__init__()

        # Grid Configuration
        self.grid_size = grid_size
        self.max_steps = max_steps

        # Action space: 4 discrete actions (up, down, left, right)
        self.action_space = gym.spaces.Discrete(4)

        # Observation space: Grid flattened into a single vector
        self.observation_space = gym.spaces.Box(
            low=0, high=3, shape=(grid_size, grid_size), dtype=np.int32
        )

        # Rewards
        self.reward_treasure = 10
        self.reward_trap = -5
        self.reward_exit = 50
        self.step_penalty = -1

        # Initialize the environment
        self.reset()

    def reset(self, seed=None, options=None):
        # Reset step counter
        self.steps = 0

        # Create a new grid
        self.grid = np.zeros((self.grid_size, self.grid_size), dtype=np.int32)

        # Place treasures (value = 1)
        for _ in range(10):
            x, y = random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)
            self.grid[x, y] = 1

        # Place traps (value = 2)
        for _ in range(10):
            x, y = random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)
            if self.grid[x, y] == 0:  # Ensure no overlap
                self.grid[x, y] = 2

        # Place exit (value = 3)
        self.grid[self.grid_size - 1, self.grid_size - 1] = 3

        # Player's starting position
        self.player_pos = [0, 0]

        # Observation: Initial grid state
        return self._get_observation(), {}

    def step(self, action):
        self.steps += 1

        # Move player based on the action
        if action == 0 and self.player_pos[0] > 0:  # Up
            self.player_pos[0] -= 1
        elif action == 1 and self.player_pos[0] < self.grid_size - 1:  # Down
            self.player_pos[0] += 1
        elif action == 2 and self.player_pos[1] > 0:  # Left
            self.player_pos[1] -= 1
        elif action == 3 and self.player_pos[1] < self.grid_size - 1:  # Right
            self.player_pos[1] += 1

        # Calculate reward
        current_cell = self.grid[self.player_pos[0], self.player_pos[1]]
        reward = self.step_penalty  # Default step penalty

        if current_cell == 1:  # Treasure
            reward += self.reward_treasure
            self.grid[self.player_pos[0], self.player_pos[1]] = 0  # Remove treasure
        elif current_cell == 2:  # Trap
            reward += self.reward_trap
        elif current_cell == 3:  # Exit
            reward += self.reward_exit
            return self._get_observation(), reward, True, False, {}

        # Check termination
        done = self.steps >= self.max_steps
        return self._get_observation(), reward, done, False, {}

    def _get_observation(self):
        return np.copy(self.grid)

# Funktion zum Trainieren und Evaluieren des Modells mit den neuen Hyperparametern
def train_dqn(env, learning_rate, gamma, batch_size, buffer_size, exploration_initial_eps, 
              exploration_final_eps, exploration_fraction, target_update_interval, train_freq, total_timesteps):
    
    model = DQN("MlpPolicy", env, 
                learning_rate=learning_rate,
                gamma=gamma,
                batch_size=batch_size,
                buffer_size=buffer_size,
                exploration_initial_eps=exploration_initial_eps,
                exploration_final_eps=exploration_final_eps,
                exploration_fraction=exploration_fraction,
                target_update_interval=target_update_interval,
                train_freq=train_freq,
                verbose=1)
    
    # Modell trainieren
    model.learn(total_timesteps=total_timesteps)

    # Modell evaluieren
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)

    return model, mean_reward, std_reward

# Setzen der verbesserten Hyperparameter für den zweiten Trainingsfall
updated_hyperparameters = {
    "learning_rate": 0.0005,
    "gamma": 0.98,
    "batch_size": 128,
    "buffer_size": 100000,
    "exploration_initial_eps": 1.0,
    "exploration_final_eps": 0.02,
    "exploration_fraction": 0.99,
    "target_update_interval": 500,
    "train_freq": 2,
    "total_timesteps": 20000
}

# Umgebung initialisieren
env = TreasureHuntEnv(grid_size=5, max_steps=50)

# Training mit den verbesserten Hyperparametern ausführen
model, mean_reward, std_reward = train_dqn(env, **updated_hyperparameters)

# Ergebnisse ausgeben
print(f"\nErgebnisse nach Training mit verbesserten Hyperparametern:\n"
      f"Durchschnittliche Belohnung: {mean_reward}\n"
      f"Standardabweichung der Belohnung: {std_reward}\n")

# Modell speichern
model.save("dqn_treasure_hunt_v2")


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 50       |
|    ep_rew_mean      | -67.5    |
|    exploration_rate | 0.99     |
| time/               |          |
|    episodes         | 4        |
|    fps              | 1340     |
|    time_elapsed     | 0        |
|    total_timesteps  | 200      |
| train/              |          |
|    learning_rate    | 0.0005   |
|    loss             | 2.1      |
|    n_updates        | 49       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 48.5     |
|    ep_rew_mean      | -50.4    |
|    exploration_rate | 0.981    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 1355     |
|    time_elapsed     | 0        |
|    total_timesteps  | 388      |
| train/              |        



In [5]:
import gymnasium as gym
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
import numpy as np
import random

# Define the TreasureHunt Environment
class TreasureHuntEnv(gym.Env):
    def __init__(self, grid_size=10, max_steps=100):
        super(TreasureHuntEnv, self).__init__()

        # Grid Configuration
        self.grid_size = grid_size
        self.max_steps = max_steps

        # Action space: 4 discrete actions (up, down, left, right)
        self.action_space = gym.spaces.Discrete(4)

        # Observation space: Grid flattened into a single vector
        self.observation_space = gym.spaces.Box(
            low=0, high=3, shape=(grid_size, grid_size), dtype=np.int32
        )

        # Rewards
        self.reward_treasure = 10
        self.reward_trap = -5
        self.reward_exit = 50
        self.step_penalty = -1

        # Initialize the environment
        self.reset()

    def reset(self, seed=None, options=None):
        # Reset step counter
        self.steps = 0

        # Create a new grid
        self.grid = np.zeros((self.grid_size, self.grid_size), dtype=np.int32)

        # Place treasures (value = 1)
        for _ in range(10):
            x, y = random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)
            self.grid[x, y] = 1

        # Place traps (value = 2)
        for _ in range(10):
            x, y = random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)
            if self.grid[x, y] == 0:  # Ensure no overlap
                self.grid[x, y] = 2

        # Place exit (value = 3)
        self.grid[self.grid_size - 1, self.grid_size - 1] = 3

        # Player's starting position
        self.player_pos = [0, 0]

        # Observation: Initial grid state
        return self._get_observation(), {}

    def step(self, action):
        self.steps += 1

        # Move player based on the action
        if action == 0 and self.player_pos[0] > 0:  # Up
            self.player_pos[0] -= 1
        elif action == 1 and self.player_pos[0] < self.grid_size - 1:  # Down
            self.player_pos[0] += 1
        elif action == 2 and self.player_pos[1] > 0:  # Left
            self.player_pos[1] -= 1
        elif action == 3 and self.player_pos[1] < self.grid_size - 1:  # Right
            self.player_pos[1] += 1

        # Calculate reward
        current_cell = self.grid[self.player_pos[0], self.player_pos[1]]
        reward = self.step_penalty  # Default step penalty

        if current_cell == 1:  # Treasure
            reward += self.reward_treasure
            self.grid[self.player_pos[0], self.player_pos[1]] = 0  # Remove treasure
        elif current_cell == 2:  # Trap
            reward += self.reward_trap
        elif current_cell == 3:  # Exit
            reward += self.reward_exit
            return self._get_observation(), reward, True, False, {}

        # Check termination
        done = self.steps >= self.max_steps
        return self._get_observation(), reward, done, False, {}

    def _get_observation(self):
        return np.copy(self.grid)

# Funktion zum Trainieren und Evaluieren des Modells mit den neuen Hyperparametern
def train_dqn(env, learning_rate, gamma, batch_size, buffer_size, exploration_initial_eps, 
              exploration_final_eps, exploration_fraction, target_update_interval, train_freq, total_timesteps):
    
    model = DQN("MlpPolicy", env, 
                learning_rate=learning_rate,
                gamma=gamma,
                batch_size=batch_size,
                buffer_size=buffer_size,
                exploration_initial_eps=exploration_initial_eps,
                exploration_final_eps=exploration_final_eps,
                exploration_fraction=exploration_fraction,
                target_update_interval=target_update_interval,
                train_freq=train_freq,
                verbose=1)
    
    # Modell trainieren
    model.learn(total_timesteps=total_timesteps)

    # Modell evaluieren
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)

    return model, mean_reward, std_reward

# Setzen der optimierten Hyperparameter für den dritten Trainingsfall
optimized_hyperparameters = {
    "learning_rate": 0.0003,
    "gamma": 0.97,
    "batch_size": 256,
    "buffer_size": 200000,
    "exploration_initial_eps": 1.0,
    "exploration_final_eps": 0.005,
    "exploration_fraction": 0.995,
    "target_update_interval": 250,
    "train_freq": 1,
    "total_timesteps": 50000
}

# Umgebung initialisieren
env = TreasureHuntEnv(grid_size=5, max_steps=50)

# Training mit den optimierten Hyperparametern ausführen
model, mean_reward, std_reward = train_dqn(env, **optimized_hyperparameters)

# Ergebnisse ausgeben
print(f"\nErgebnisse nach Training mit optimierten Hyperparametern:\n"
      f"Durchschnittliche Belohnung: {mean_reward}\n"
      f"Standardabweichung der Belohnung: {std_reward}\n")

# Modell speichern
model.save("dqn_treasure_hunt_v3")


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 50       |
|    ep_rew_mean      | -66.2    |
|    exploration_rate | 0.996    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 831      |
|    time_elapsed     | 0        |
|    total_timesteps  | 200      |
| train/              |          |
|    learning_rate    | 0.0003   |
|    loss             | 1.95     |
|    n_updates        | 99       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 50       |
|    ep_rew_mean      | -71.9    |
|    exploration_rate | 0.992    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 598      |
|    time_elapsed     | 0        |
|    total_timesteps  | 400      |
| train/              |        