In [19]:
import gymnasium as gym
import numpy as np
from gymnasium.envs.registration import register

In [20]:


env=gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True)
# Custom Frozen Lake Environment
class CustomFrozenLake(gym.envs.toy_text.FrozenLakeEnv):
    def __init__(self, **kwargs):
        super(CustomFrozenLake, self).__init__(**kwargs)

    def step(self, action):
        state, reward, done, truncated, info = super(CustomFrozenLake, self).step(action)
        self.goal=env.unwrapped.desc.shape[0]*env.unwrapped.desc.shape[1]-1
        if state == self.goal:
            reward = 5  # Positive reward for reaching the goal
        elif done and state != self.goal:
            reward = -5  # Negative reward for falling into a hole
        else:
            reward =-1
        return state, reward, done, truncated, info

# Register the custom environment
register(
    id='CustomFrozenLake-v0',
    entry_point='__main__:CustomFrozenLake',
    kwargs={'is_slippery': True,
          }
)

# Create the custom environment
env = gym.make('CustomFrozenLake-v0', render_mode='human')
state = env.reset()

In [21]:
def policy_evaluation(env, policy, gamma=1.0, theta=0.00001):
    V = np.zeros(env.nS)
    ite = 0
    while (ite<10):
        delta = 0
        for s in range(env.nS):
            v = 0
            for a, action_prob in enumerate(policy[s]):
                for prob, next_state, reward, done in env.P[s][a]:
                    v += action_prob * prob * (reward + gamma * V[next_state])
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        if delta < theta:
            break
        ite += 1
    return np.array(V)


In [22]:
def policy_improvement(env, policy_eval_fn=policy_evaluation, gamma=1.0):
    def one_step_lookahead(state, V):
        A = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                A[a] += prob * (reward + gamma * V[next_state])
        return A

    policy = np.zeros([env.nS, env.nA]) / env.nA
    ite = 0
    while (ite<10):
        V = policy_eval_fn(env, policy, gamma=gamma)

        policy_stable = True
        for s in range(env.nS):
            chosen_a = np.argmax(policy[s])
            action_values = one_step_lookahead(s, V)
            best_a = np.argmax(action_values)
            if chosen_a != best_a:
                policy_stable = False
            policy[s] = np.eye(env.nA)[best_a]
        if policy_stable:
            return policy, V
        ite += 1
    return policy, V

In [23]:
def policy_render(env, policy, n=10):
    wins = 0
    for _ in range(n):
        state,_ = env.reset()
        done = False
        while not done:
            action = np.argmax(policy[state])
            state, reward, done,_, info = env.step(action)
            env.render()
            if done and reward == 1:
                wins += 1
    return wins / n


env.nS = 16
env.nA = 4
random_policy = np.ones([env.nS, env.nA]) / env.nA
v = policy_evaluation(env, random_policy)
policy, v = policy_improvement(env)
policy_render(env, policy)

0.0

In [24]:
a = env.observation_space

In [25]:
a

Discrete(16)

In [26]:
# observation, info = env.reset()

# episode_over = False
# while not episode_over:
#     action = env.action_space.sample()  # agent policy that uses the observation and info
#     observation, reward, terminated, truncated, info = env.step(action)

#     episode_over = terminated or truncated

# env.close()