<a href="https://colab.research.google.com/github/AronBensimhon/RL_game/blob/main/Frozen_game.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# ! pip install gym==0.26.2
# ! pip install pygame numpy matplotlib

Collecting gym==0.26.2
  Downloading gym-0.26.2.tar.gz (721 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m721.7/721.7 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: gym
  Building wheel for gym (pyproject.toml) ... [?25l[?25hdone
  Created wheel for gym: filename=gym-0.26.2-py3-none-any.whl size=827725 sha256=55bc4e02281a809150173bcce193f63cd4f9bb4428360f58e950eeba1ad42df0
  Stored in directory: /root/.cache/pip/wheels/1c/77/9e/9af5470201a0b0543937933ee99ba884cd237d2faefe8f4d37
Successfully built gym
Installing collected packages: gym
  Attempting uninstall: gym
    Found existing installation: gym 0.25.2
    Uninstalling gym-0.25.2:
      Successfully uninstalled gym-0.25.2
[31mERROR: pip's dependency resolver does not currently take into account al

In [25]:
# !pip install termcolor



### Step 1: Setup and Environment Creation

In [26]:
import gym
import numpy as np
import random
from termcolor import colored

warnings.filterwarnings("ignore", category=UserWarning)

Generates a random 4x4 FrozenLake map with up to 3 holes and 2 reward tiles:

'S' = Start, 'G' = Goal, 'F' = Frozen tile, 'H' = Hole, 'R' = Reward

In [27]:
def generate_random_map():
    size = 4
    map_ = [['F' for _ in range(size)] for _ in range(size)]
    map_[0][0] = 'S'
    map_[-1][-1] = 'G'

    # Add up to 3 holes
    holes = random.sample([(i, j) for i in range(size) for j in range(size)
                           if (i, j) not in [(0, 0), (3, 3)]], k=random.randint(1, 3))
    for i, j in holes:
        map_[i][j] = 'H'

    # Add up to 2 rewards
    empty = [(i, j) for i in range(size) for j in range(size) if map_[i][j] == 'F']
    rewards = random.sample(empty, k=min(2, len(empty)))
    for i, j in rewards:
        map_[i][j] = 'R'

    return ["".join(row) for row in map_], rewards

### Step 2: Custom Reward Wrapper for Gym Environment

In [62]:
class CustomRewardWrapper(gym.Wrapper):
    def __init__(self, env, reward_positions):
        super().__init__(env)
        self.reward_positions = reward_positions.copy()
        self.collected_rewards = set()
        self.grid_size = 4

    def reset(self, **kwargs):
        self.collected_rewards = set()
        return self.env.reset(**kwargs)

    def step(self, action):
        state, original_reward, done, truncated, info = self.env.step(action)
        r = -1

        row = state // self.grid_size
        col = state % self.grid_size

        if (row, col) in self.reward_positions and (row, col) not in self.collected_rewards:
            r += 3
            self.collected_rewards.add((row, col))

        if original_reward == 1:
            r += 10

        if done and original_reward == 0:
            r -= 3

        info["original_reward"] = original_reward

        return state, r, done, truncated, info

### Step 3: Helper Function to Display the Map Matrix with Agent Highlighted

Highlights the agent's position by surrounding it with '*' on the grid

In [63]:
def render_colored_map(desc, agent_pos, path_positions=None):
    if path_positions is None:
        path_positions = []

    for i, row in enumerate(desc):
        display_row = []
        for j, ch in enumerate(row):
            index = i * 4 + j
            if index == agent_pos:
                display_row.append(f'\033[1;37;41m{ch}\033[0m')
            elif index in path_positions:
                display_row.append(f'\033[1m{ch}\033[0m')
            else:
                display_row.append(ch)
        print(" ".join(display_row))
    print()

### Step 4: Q-Learning Training Loop with Progress Monitoring

Trains the agent and prints sample steps with the map every 50 episodes

In [74]:
def train_agent():
    env_name = "CustomFrozenLake-v0"
    gym.envs.registration.register(
        id=env_name,
        entry_point="gym.envs.toy_text:FrozenLakeEnv",
        max_episode_steps=100,
        kwargs={'desc': None, 'map_name': '4x4', 'is_slippery': False},
    )

    Q = np.zeros((16, 4))
    alpha = 0.8
    gamma = 0.95
    epsilon = 0.1
    episodes = 2000

    success_counter = 0
    reward_accumulator = 0

    for episode in range(1, episodes + 1):
        map_desc, reward_positions = generate_random_map()
        env = gym.make(env_name, desc=map_desc)
        env = CustomRewardWrapper(env, reward_positions)

        state = env.reset()[0]
        done = False
        total_reward = 0
        max_steps = 100

        for step in range(max_steps):
            if np.random.rand() < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(Q[state])

            next_state, reward, done, _, info = env.step(action)

            if next_state == state:
                reward -= 2

            Q[state, action] += alpha * (reward + gamma * np.max(Q[next_state]) - Q[state, action])
            total_reward += reward
            state = next_state

            if done:
                if info.get("original_reward") == 1 or reward >= 10:
                    success_counter += 1
                break

        reward_accumulator += total_reward
        epsilon = max(0.01, epsilon * 0.995)

        if episode % 20 == 0:
            print("=" * 40)
            print(f"🎯 Episode {episode} | Successes: {success_counter}/20 | Avg Reward: {reward_accumulator / 20:.2f}")
            success_counter = 0
            reward_accumulator = 0

            print("\n🗺️ Sample map (S=start, G=goal, H=hole, R=reward):")
            for row in map_desc:
                print(" ".join(row))

            print("\n👣 Sample rollout:")
            state = env.reset()[0]
            done = False
            path_history = []
            step_count = 0
            rollout_max_steps = 30

            for step in range(rollout_max_steps):
                action = np.argmax(Q[state])
                print(f"Step {step + 1}: (Action: {['Left', 'Down', 'Right', 'Up'][action]})")
                render_colored_map(map_desc, state, path_positions=path_history)
                path_history.append(state)
                state, reward, done, _, _ = env.step(action)
                step_count += 1
                if done:
                    break

            print(f"🧭 Total rollout steps: {step_count}")

In [75]:
train_agent()

🎯 Episode 20 | Successes: 8/20 | Avg Reward: -6.80

🗺️ Sample map (S=start, G=goal, H=hole, R=reward):
S F F R
F F F R
F F F F
F H F G

👣 Sample rollout:
Step 1: (Action: Down)
[1;37;41mS[0m F F R
F F F R
F F F F
F H F G

Step 2: (Action: Down)
[1mS[0m F F R
[1;37;41mF[0m F F R
F F F F
F H F G

Step 3: (Action: Right)
[1mS[0m F F R
[1mF[0m F F R
[1;37;41mF[0m F F F
F H F G

Step 4: (Action: Right)
[1mS[0m F F R
[1mF[0m F F R
[1mF[0m [1;37;41mF[0m F F
F H F G

Step 5: (Action: Right)
[1mS[0m F F R
[1mF[0m F F R
[1mF[0m [1mF[0m [1;37;41mF[0m F
F H F G

Step 6: (Action: Down)
[1mS[0m F F R
[1mF[0m F F R
[1mF[0m [1mF[0m [1mF[0m [1;37;41mF[0m
F H F G

🧭 Total rollout steps: 6
🎯 Episode 40 | Successes: 7/20 | Avg Reward: -1.50

🗺️ Sample map (S=start, G=goal, H=hole, R=reward):
S H F F
F F R F
F F F R
H F F G

👣 Sample rollout:
Step 1: (Action: Down)
[1;37;41mS[0m H F F
F F R F
F F F R
H F F G

Step 2: (Action: Down)
[1mS[0m H F F
[1;37;41mF[0m