In [1]:
%pip install gymnasium

Note: you may need to restart the kernel to use updated packages.




In [67]:
import gym
import numpy as np

# Custom map with treasure (T) and other tiles (F - Frozen, H - Hole, G - Goal)
custom_map = [
    "SFFHT",  # Start position (S), Frozen Tiles (F)
    "FHFFF",  # Hole (H), Frozen (F), Treasure (T)
    "FFFTF",  # Frozen (F), Hole (H), Frozen (F)
    "TFHFF",  # Hole (H), Treasure (T), Frozen (F)
    "FFFFG",  # Frozen (F), Frozen (F), Hole (H), Goal (G)
]

class CustomFrozenLakeEnv(gym.Env):
    def __init__(self, desc=None, is_slippery=True):
        super().__init__()
        self.desc = np.array(desc or custom_map, dtype='c')
        self.is_slippery = is_slippery
        self.reward_range = (-float('inf'), float('inf'))

        # Custom action and state space
        self.nrow, self.ncol = self.desc.shape
        self.nS = self.nrow * self.ncol  # Total number of states (5x5 grid)
        self.nA = 4  # 4 actions (up, down, left, right)

        # Initialize state to start position 'S'
        self.state = 0  # The state represents the index of 'S' in the grid
        self.done = False
        self.treasures_collected = set()  # Track collected treasures
        
        # Action space: 4 actions (Up, Down, Left, Right)
        self.action_space = gym.spaces.Discrete(self.nA)
        
        # Observation space: 5x5 grid (flattened to 25 states)
        self.observation_space = gym.spaces.Discrete(self.nS)
        
        # Transition model: store transition probabilities
        self.P = {s: {a: [] for a in range(self.nA)} for s in range(self.nS)}

        self._build_transition_model()

    def step(self, action):
        if self.done:
            return self.state, 0, True, {}

        row, col = divmod(self.state, self.ncol)

        if action == 0:  # Up
            new_row, new_col = max(row - 1, 0), col
        elif action == 1:  # Down
            new_row, new_col = min(row + 1, self.nrow - 1), col
        elif action == 2:  # Left
            new_row, new_col = row, max(col - 1, 0)
        elif action == 3:  # Right
            new_row, new_col = row, min(col + 1, self.ncol - 1)
        
        # Get the new state index
        new_state = new_row * self.ncol + new_col
        tile = self.desc[new_row, new_col].decode("utf-8")

        reward = 0
        done = False
        if tile == 'H':  # If the agent falls into a hole, game over
            done = True
            reward = -1
        elif tile == 'G':  # If the agent reaches the goal, game over with a success reward
            done = True
            reward = 1
        elif tile == 'T':  # If the agent steps on a treasure, give a +5 reward
            reward = 5
            # Mark treasure as collected
            self.treasures_collected.add(new_state)
            # After stepping on a treasure, it becomes frozen (F)
            self.desc[new_row, new_col] = b'F'

        # Update the state
        self.state = new_state
        return self.state, reward, done, {}

    def reset(self):
        self.state = 0  # Reset to the start position (state 0)
        self.done = False
        self.treasures_collected.clear()  # Reset collected treasures
        return self.state

    def render(self):
        row, col = divmod(self.state, self.ncol)
        print(f"Current state: Position ({row}, {col})")
        print(f"Treasures collected: {len(self.treasures_collected)}")
        print(f"Current grid:")
        print(self.desc)

    def _build_transition_model(self):
        for row in range(self.nrow):
            for col in range(self.ncol):
                state = row * self.ncol + col  # Position index
                for action in range(self.nA):
                    next_state, reward, done = self._transition(row, col, action)
                    self.P[state][action].append((0.25, next_state, reward, done))

    def _transition(self, row, col, action):
        if action == 0:  # Up
            new_row, new_col = max(row - 1, 0), col
        elif action == 1:  # Down
            new_row, new_col = min(row + 1, self.nrow - 1), col
        elif action == 2:  # Left
            new_row, new_col = row, max(col - 1, 0)
        elif action == 3:  # Right
            new_row, new_col = row, min(col + 1, self.ncol - 1)

        new_state = new_row * self.ncol + new_col
        tile = self.desc[new_row, new_col].decode("utf-8")

        if tile == 'H':  # Hole
            reward = -10
            done = True
        elif tile == 'G':  # Goal
            reward = 10
            done = True
        elif tile == 'T':  # Treasure
            reward = 5
            done = False
        else:  # Frozen Tile
            reward = 0
            done = False

        return new_state, reward, done


# Create the custom FrozenLake environment
env = CustomFrozenLakeEnv(desc=custom_map, is_slippery=True)

# Check the environment properties
print(f"Number of states: {env.observation_space.n}")  # 25 states for a 5x5 grid
print(f"Number of actions: {env.action_space.n}")  # 4 actions (up, down, left, right)

# Reset the environment to start
state = env.reset()
print(f"Initial state: {state}")

# Example interaction: Take a random action
action = env.action_space.sample()  # Random action
next_state, reward, done, info = env.step(action)

print(f"Action taken: {action}")
print(f"Next state: {next_state}, Reward: {reward}, Done: {done}")


Number of states: 25
Number of actions: 4
Initial state: 0
Action taken: 2
Next state: 0, Reward: 0, Done: False


In [68]:
nb_states = env.observation_space.n
nb_actions = env.action_space.n

print(nb_states)
print(nb_actions)
# Check transition model (env.P)
for state in env.P:
    print(f"State {state}:")
    for action in env.P[state]:
        for prob, next_state, reward, done in env.P[state][action]:
            print(f"  Action {action}: Probability={prob}, Next State={next_state}, Reward={reward}, Done={done}")

25
4
State 0:
  Action 0: Probability=0.25, Next State=0, Reward=0, Done=False
  Action 1: Probability=0.25, Next State=5, Reward=0, Done=False
  Action 2: Probability=0.25, Next State=0, Reward=0, Done=False
  Action 3: Probability=0.25, Next State=1, Reward=0, Done=False
State 1:
  Action 0: Probability=0.25, Next State=1, Reward=0, Done=False
  Action 1: Probability=0.25, Next State=6, Reward=-10, Done=True
  Action 2: Probability=0.25, Next State=0, Reward=0, Done=False
  Action 3: Probability=0.25, Next State=2, Reward=0, Done=False
State 2:
  Action 0: Probability=0.25, Next State=2, Reward=0, Done=False
  Action 1: Probability=0.25, Next State=7, Reward=0, Done=False
  Action 2: Probability=0.25, Next State=1, Reward=0, Done=False
  Action 3: Probability=0.25, Next State=3, Reward=-10, Done=True
State 3:
  Action 0: Probability=0.25, Next State=3, Reward=-10, Done=True
  Action 1: Probability=0.25, Next State=8, Reward=0, Done=False
  Action 2: Probability=0.25, Next State=2, Re

In [75]:
env.render()

Current state: Position (0, 0)
Treasures collected: 0
Current grid:
[[b'S' b'F' b'F' b'H' b'T']
 [b'F' b'H' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'T' b'F']
 [b'T' b'F' b'H' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'G']]


In [71]:
def value_iteration(env):
  num_iterations = 100
  threshold = 1e-20 #value used to terminate if no changes observed in the new values from the previous values
  gamma = .6 #discount factor
  value_table = np.zeros(env.observation_space.n)
  print('Initial Value Table: ', value_table)
  for i in range(num_iterations):
    updated_value_table = np.copy(value_table)
    for s in range(env.observation_space.n):
      Q_values = []
      # Loop over all possible actions
      for a in range(env.action_space.n):
       action_value = 0
       for prob, s_, r, done in env.P[s][a]:
           action_value += prob * (r + gamma * updated_value_table[s_])
           Q_values.append(action_value)
       value_table[s] = max(Q_values)
      print(f'Value Table for State {s}', value_table[s])
    if (np.sum(np.fabs(updated_value_table - value_table)) <= threshold):
      break
  return value_table
optimal_value_function = value_iteration(env)
print('optimal value function', optimal_value_function)

Initial Value Table:  [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0.]
Value Table for State 0 0.0
Value Table for State 1 0.0
Value Table for State 2 0.0
Value Table for State 3 1.25
Value Table for State 4 1.25
Value Table for State 5 0.0
Value Table for State 6 0.0
Value Table for State 7 0.0
Value Table for State 8 1.25
Value Table for State 9 1.25
Value Table for State 10 1.25
Value Table for State 11 0.0
Value Table for State 12 1.25
Value Table for State 13 0.0
Value Table for State 14 1.25
Value Table for State 15 1.25
Value Table for State 16 1.25
Value Table for State 17 0.0
Value Table for State 18 1.25
Value Table for State 19 2.5
Value Table for State 20 1.25
Value Table for State 21 0.0
Value Table for State 22 0.0
Value Table for State 23 2.5
Value Table for State 24 2.5
Value Table for State 0 0.0
Value Table for State 1 0.0
Value Table for State 2 0.0
Value Table for State 3 1.4375
Value Table for State 4 1.4375
Value Table for State 5 0.18

In [74]:
def extract_policy(value_table):
  gamma = 1.0
  policy = np.zeros(env.observation_space.n)
  for s in range(env.observation_space.n):
    Q_values = [sum([prob*(r + gamma * value_table[s_])
                for prob, s_, r, _ in env.P[s][a]])
                for a in range(env.action_space.n)]
    policy[s] = np.argmax(np.array(Q_values))
  return policy
optimal_policy = extract_policy(optimal_value_function)

print(optimal_policy.reshape(5,5))

[[1. 2. 1. 3. 0.]
 [1. 1. 1. 1. 0.]
 [1. 1. 3. 0. 2.]
 [2. 2. 2. 0. 1.]
 [0. 0. 3. 3. 1.]]


In [31]:
import gym
import numpy as np
import random
import matplotlib.pyplot as plt

# Define the custom environment class
class CustomFrozenLakeEnv(gym.Env):
    def __init__(self, desc=None, is_slippery=False):
        # Set the custom grid for the environment
        self.desc = np.array(desc, dtype="c")
        self.nrow, self.ncol = self.desc.shape
        self.nS = self.nrow * self.ncol  # Total number of states (5x5 grid)
        self.nA = 4  # 4 actions (up, down, left, right)

        self.action_space = gym.spaces.Discrete(self.nA)
        self.observation_space = gym.spaces.Discrete(self.nS)

        # Initialize agent position
        self.state = 0  # Start at position 'S'
        self.done = False
        
    def reset(self):
        self.state = 0  # Reset to the start position
        self.done = False
        return self.state

    def step(self, action):
        if self.done:
            return self.state, 0, True, {}

        row, col = divmod(self.state, self.ncol)

        if action == 0:  # Up
            new_row, new_col = max(row - 1, 0), col
        elif action == 1:  # Down
            new_row, new_col = min(row + 1, self.nrow - 1), col
        elif action == 2:  # Left
            new_row, new_col = row, max(col - 1, 0)
        elif action == 3:  # Right
            new_row, new_col = row, min(col + 1, self.ncol - 1)
        
        new_state = new_row * self.ncol + new_col
        tile = self.desc[new_row, new_col].decode("utf-8")

        reward = 0
        if tile == 'H':  # If the agent falls into a hole, game over
            self.done = True
            reward = -10
        elif tile == 'G':  # If the agent reaches the goal, game over with a success reward
            self.done = True
            reward = 10
        elif tile == 'T':  # If the agent steps on a treasure, reward +5
            reward = 5
            # After stepping on a treasure, it becomes frozen (F)
            self.desc[new_row, new_col] = b'F'

        # Update the state
        self.state = new_state
        return self.state, reward, self.done, {}

# Custom 5x5 grid with treasures (T), holes (H), goal (G), frozen tiles (F), and start (S)
custom_map = [
    "SFFHT",  # Start position (S), Frozen Tiles (F)
    "FHFFF",  # Hole (H), Frozen (F)
    "FFFTF",  # Frozen (F), Hole (H), Frozen (F)
    "TFHFF",  # Treasure (T), Hole (H), Frozen (F)
    "FFFFG",  # Frozen (F), Goal (G)
]

# Create the custom FrozenLake environment
env = CustomFrozenLakeEnv(desc=custom_map, is_slippery=False)

# Value Iteration to calculate the optimal value function and policy
def value_iteration(env, gamma=0.9, theta=1e-6):
    V = np.zeros(env.observation_space.n)  # Initialize the value function
    policy = np.zeros(env.observation_space.n, dtype=int)  # Initialize the policy

    while True:
        delta = 0
        # Iterate through all states
        for s in range(env.observation_space.n):
            v = V[s]
            q_values = []
            # Iterate through all possible actions
            for a in range(env.action_space.n):
                # Initialize variables for calculating the expected reward
                expected_value = 0
                for prob, next_state, reward, done in env.P[s][a]:
                    expected_value += prob * (reward + gamma * V[next_state])
                q_values.append(expected_value)

            # Update the value function for the state
            V[s] = max(q_values)
            policy[s] = np.argmax(q_values)

            delta = max(delta, abs(v - V[s]))

        # If the value function has converged, break
        if delta < theta:
            break

    return V, policy

# Calculate the state-value function (V*)
V, policy = value_iteration(env)

# Print the optimal value function (V*)
print("Optimal Value Function (V*):")
print(V.reshape(env.nrow, env.ncol))

# Visualize the optimal policy (π*) for each state
policy_map = np.array([['↑', '↓', '←', '→'][a] for a in policy]).reshape(env.nrow, env.ncol)
print("Optimal Policy (π*):")
print(policy_map)

# Plot the agent’s path on the map using the learned policy
def visualize_policy(env, policy):
    policy_map = np.array([['↑', '↓', '←', '→'][a] for a in policy]).reshape(env.nrow, env.ncol)
    fig, ax = plt.subplots()
    ax.matshow(np.zeros_like(policy_map), cmap="Blues", alpha=0.3)
    
    for i in range(env.nrow):
        for j in range(env.ncol):
            ax.text(j, i, policy_map[i][j], ha='center', va='center', fontsize=16)
    
    plt.title("Learned Policy (π*)")
    plt.show()

visualize_policy(env, policy)

AttributeError: 'CustomFrozenLakeEnv' object has no attribute 'P'