In [1]:
import gym
from gym import spaces
import numpy as np
import random

class CustomFrozenLakeEnv(gym.Env):
    def __init__(self, size=16, num_holes=10, num_specials=10, start_point=(0, 0), end_point=None):
        assert size >= 4, "Size of the gym should be at least 4x4"
        assert num_holes < size**2 - 2, "Number of holes should be less than total available spaces"

        self.size = size
        self.num_holes = num_holes
        self.num_specials = num_specials
        self.start_point = start_point
        self.end_point = end_point if end_point is not None else (size - 1, size - 1)

        self.observation_space = spaces.Discrete(size**2)
        self.action_space = spaces.Discrete(4)

        self.desc = self.generate_random_environment()
        self.state = self.get_state_from_point(self.start_point)

    def generate_random_environment(self):
        desc = np.full((self.size, self.size), 'F', dtype='<U1')  # 'F' represents frozen surface
        desc[self.start_point] = 'S'  # 'S' represents the starting point
        desc[self.end_point] = 'G'    # 'G' represents the goal

        # Randomly generate holes
        hole_positions = [(i, j) for i in range(self.size) for j in range(self.size)
                          if (i, j) not in [self.start_point, self.end_point]]
        hole_positions = random.sample(hole_positions,self.num_holes)
        # print(len(hole_positions))

        for hole_pos in hole_positions:
            desc[hole_pos] = 'H'  # 'H' represents a hole

        # Randomly generate holes
        special_positions = [(i, j) for i in range(self.size) for j in range(self.size)
                          if (i, j) not in [self.start_point, self.end_point] and (i, j) not in hole_positions]
        special_positions = random.sample(special_positions,self.num_specials)
        # print(len(special_positions))

        for special_pos in special_positions:
            desc[special_pos] = 'R'  # 'H' represents a hole

        return desc

    def get_state_from_point(self, point):
        return point[0] * self.size + point[1]

    def get_point_from_state(self, state):
        return divmod(state, self.size)

    def reset(self):
        self.state = self.get_state_from_point(self.start_point)
        return self.state

    def step(self, action):
        row, col = self.get_point_from_state(self.state)
        if action == 0:  # Move Up
            row = max(0, row - 1)
        elif action == 1:  # Move Down
            row = min(self.size - 1, row + 1)
        elif action == 2:  # Move Left
            col = max(0, col - 1)
        elif action == 3:  # Move Right
            col = min(self.size - 1, col + 1)

        next_state = self.get_state_from_point((row, col))
        reward = 0
        if self.desc[row, col] == 'G': 
            reward = 1  # +1 if the goal is reached
        if self.desc[row, col] == 'R':
            reward = 10  # +10 if the bonus location reached
            self.desc[row, col] = 'F'
        
        done = (self.desc[row, col] == 'H') or (self.desc[row, col] == 'G')  # Done if a hole or the goal is reached

        self.state = next_state
        return next_state, reward, done, {}

    def render(self):
        print("\n".join(["".join(row) for row in self.desc]))
        
    def preprocess_obs(self, obs):
        # Ensure class values are integers within the valid range
        obs = np.clip(obs, 0, self.observation_space.n - 1)

        # One-hot encoding without torch
        obs_one_hot = np.eye(self.observation_space.n)[obs]

        return obs_one_hot


env = CustomFrozenLakeEnv(size=16, num_holes=10, num_specials=10, start_point=(0, 0), end_point=(15, 15))
env.render()

SFFFFFFFFFFFRFFF
FFFFFFFFFHFFFFFR
FFFFFFFFFFFFRFFH
FFFFFFFFFFFFFFFR
FFHFFFFFFFFFFFFF
FFFFFFFFFFFFFFFF
FFFFFFFFFFFFFFFF
FFFFFFRFFFFFRFFF
FFFFFFHFFFFFFFFF
FFFFFFFFFFFFFFFF
FFFFFFFFFFRFFFRF
FFFFFRFFFFFFFFFF
FFHFFFFHFHFHFFFF
FFFFFFFFFFFFFFFF
FFFFFFFFFHFFFFFF
FFFFFFHFFFRFFFFG


In [2]:
custom_lake = [''.join(sublist) for sublist in env.desc]

print(custom_lake)

['SFFFFFFFFFFFRFFF', 'FFFFFFFFFHFFFFFR', 'FFFFFFFFFFFFRFFH', 'FFFFFFFFFFFFFFFR', 'FFHFFFFFFFFFFFFF', 'FFFFFFFFFFFFFFFF', 'FFFFFFFFFFFFFFFF', 'FFFFFFRFFFFFRFFF', 'FFFFFFHFFFFFFFFF', 'FFFFFFFFFFFFFFFF', 'FFFFFFFFFFRFFFRF', 'FFFFFRFFFFFFFFFF', 'FFHFFFFHFHFHFFFF', 'FFFFFFFFFFFFFFFF', 'FFFFFFFFFHFFFFFF', 'FFFFFFHFFFRFFFFG']


In [3]:
import gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
import torch



# Create and wrap the custom environment
env = CustomFrozenLakeEnv(size=16, num_holes=10, num_specials=10, start_point=(0, 0), end_point=(15, 15))
env = DummyVecEnv([lambda: env])

# Define the PPO model
model = PPO("MlpPolicy", env, verbose=1)

# Train the model
model.learn(total_timesteps=100000)

# Save the model
model.save("ppo_custom_frozenlake")

# Load the trained model (optional)
# model = PPO.load("ppo_custom_frozenlake")

# Test the trained agent
obs = env.reset()
for _ in range(1000):
    action, _ = model.predict(obs, deterministic=True)
    obs, _, done, _ = env.step(action)
    env.render()

# Close the environment
env.close()




Using cpu device
-----------------------------
| time/              |      |
|    fps             | 5305 |
|    iterations      | 1    |
|    time_elapsed    | 0    |
|    total_timesteps | 2048 |
-----------------------------
------------------------------------------
| time/                   |              |
|    fps                  | 3241         |
|    iterations           | 2            |
|    time_elapsed         | 1            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0017375792 |
|    clip_fraction        | 9.77e-05     |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.39        |
|    explained_variance   | -0.00736     |
|    learning_rate        | 0.0003       |
|    loss                 | 0.118        |
|    n_updates            | 10           |
|    policy_gradient_loss | -0.00464     |
|    value_loss           | 2.79         |
------------------------------------------



In [4]:
import gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from level2 import CustomFrozenLakeEnv

environment_name = 'FrozenLake-v1'
render_mode = 'human'

try:
    gym_env = gym.make(environment_name, desc=custom_lake, render_mode=render_mode)
    vec_env = DummyVecEnv([lambda: gym_env])
except gym.error.UnregisteredEnv:
    # If the Gymnasium environment is not available, use the OpenAI Gym environment
    vec_env = DummyVecEnv([lambda: gym.make(environment_name, desc=custom_lake, render_mode=render_mode)])





# Load the trained model
model = PPO.load("/Users/software/Desktop/reinforcement_learning_practise/hackathon/level_1/level3_custom_frozenlake_model.zip")

# Set the number of episodes for the trial
num_episodes = 50

# Run a trial of various episodes
for episode in range(num_episodes):
    obs = vec_env.reset()
    total_reward = 0
    done = False
    episode_path = {"observations": [], "actions": [], "rewards": []}

    while not done:
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, info = vec_env.step(action)

        # Store observations, actions, and rewards during the episode
        episode_path["observations"].append(obs.copy())
        episode_path["actions"].append(action)
        episode_path["rewards"].append(reward)

        total_reward += reward

    print(f"Episode {episode + 1} - Total Reward: {total_reward}")

    # Check if the episode was successful
    if total_reward == 1:
        print("Episode succeeded!")
        print("Observations:", episode_path["observations"])
        print("Actions:", episode_path["actions"])
        print("Rewards:", episode_path["rewards"])

# Close the environment
vec_env.close()



Episode 1 - Total Reward: [0.]
Episode 2 - Total Reward: [0.]
Episode 3 - Total Reward: [0.]
Episode 4 - Total Reward: [0.]
Episode 5 - Total Reward: [0.]
Episode 6 - Total Reward: [0.]


KeyboardInterrupt: 