In [1]:
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
import numpy as np
import matplotlib.pyplot as plt
import time

## Helper Functions.


### 1. Function to test a policy

In [2]:
def test(env,policy):
    n_episodes = 5000
    avg_length=0
    avg_reward=0
    n_states=env.observation_space.n
    n_actions=env.action_space.n
    for episode in range(n_episodes):
        state = env.reset()[0]
        done = False
        while not done:
            action = policy[state]
            state, reward, done, _, _ = env.step(action)
            avg_length+=1   
            avg_reward+=reward
    
    avg_length/=n_episodes
    avg_reward/=n_episodes
    print(f"Average episode length :{avg_length}")
    print(f"Average reward per episode :{avg_reward}")


### 2. Function to get policy given the action-value function

In [3]:
def get_policy(Q):
    n_states=Q.shape[0]
    policy=np.zeros(n_states,dtype=int)
    for state in range(n_states):
        policy[state] = np.argmax(Q[state])

    return policy


### 3. Function to take greedy action

In [4]:
def greedy_action(Q_s):
    max_val = Q_s.max()
    candidates = np.flatnonzero(Q_s == max_val)
    return np.random.choice(candidates)

# Algorithms

## 1. Monte Carlo (On-policy first-visit for epsilon soft policies)

In [5]:
def MC(env, max_episodes, gamma=0.95):
    epsilon   = 0.1
    n_states  = env.observation_space.n
    n_actions = env.action_space.n
    Q = np.zeros((n_states, n_actions))
    N = np.zeros((n_states, n_actions))

    for ep in range(max_episodes):
        t = 0
        first_visit = np.full((n_states, n_actions), -1)             
        episode  = []
        state = env.reset()[0]
        done = False

        while not done:
            if np.random.rand() < epsilon:
                action = env.action_space.sample()
            else:
                action = greedy_action(Q[state])

            next_state, reward, done, _, _ = env.step(action)

            if first_visit[state][action] == -1:
                first_visit[state][action] = t

            episode.append((state, action, reward))
            state = next_state
            t += 1

        G = 0
        T = len(episode)
        for t in range(T-1, -1, -1):
            state, action, reward = episode[t]
            G = gamma * G + reward

            if first_visit[state, action] == t:
                N[state, action] += 1
                Q[state, action] += (G - Q[state, action]) / N[state, action]

    return Q


## 2. Sarsa

In [6]:
def Sarsa(env, max_episodes,gamma=0.95):
    epsilon = 0.1
    alpha = 0.1
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    Q = np.zeros((n_states, n_actions))

    for ep in range(max_episodes):
        state = env.reset()[0]
        done = False
        
        while not done:
            if np.random.rand() < epsilon:
                action = env.action_space.sample()
            else:
                action = greedy_action(Q[state])

            next_state, reward, done, _, _ = env.step(action)

            if np.random.rand() < epsilon:
                next_action = env.action_space.sample()
            else:
                next_action = greedy_action(Q[next_state])
                
            Q[state, action] += alpha * (reward + gamma*Q[next_state, next_action] - Q[state, action])
            state = next_state

    return Q


## 3. Q-Learning

In [7]:
def Q_learning(env,max_episodes,gamma=0.95):
    
    epsilon = 0.1
    alpha = 0.1
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    Q = np.zeros((n_states,n_actions))
    
    for ep in range(max_episodes):
        
        state = env.reset()[0]
        done = False
        while not done:
            if np.random.rand() < epsilon:
                action = env.action_space.sample()
            else:
                action = greedy_action(Q[state])
                
            next_state, reward, done, _, _ = env.step(action)
            Q[state,action] += alpha*(reward+gamma*np.max(Q[next_state])-Q[state,action])
            state = next_state
            
    return Q

## 4. Policy Iteration

In [8]:
def policy_iteration(env,gamma=0.95):
    start=time.time()
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    P=env.unwrapped.P #dynamics of the environment
    Values=np.zeros(n_states) # numpy array to store Value function
    Policy=np.zeros(n_states,dtype=int)

    threshold=1e-4  
    ctr=0
   
    while True:
        ctr+=1
        
        # policy evaluation step     
        delta=float('inf')
        while(delta>threshold):
            
            Values_new=np.zeros(n_states)
            delta=0
            for state in range(n_states):
                action=Policy[state]
                for prob,next_state,reward,_ in  P[state][action]:
                    Values_new[state]+=prob*(reward+gamma*Values[next_state])
            
                delta=max(delta,abs(Values_new[state]-Values[state]))
            Values[:] = Values_new
            
        
        # policy improvement step
        
        stable=True
        for state in range(n_states):
            
            old_action=Policy[state]
            max_v=-float('inf')
            for action in range(n_actions):
                v=0
                for prob,next_state,reward,_ in  P[state][action]:
                    v+=prob*(reward+gamma*Values[next_state])
                if (v>max_v):
                    Policy[state]=action
                    max_v=v
    
            if(old_action!=Policy[state]): stable=False
    
        if (stable):
            break
    end=time.time()
    print(f"Policy Iteration took {ctr} iterations to converge")
    print(f"Policy Iteration took {end-start} seconds to converge")

    return Policy     

#### Now lets test these algorithms on frozen lake environment-

In [9]:
size = 10
random_desc = generate_random_map(size,p=0.9)    
env = gym.make(
  "FrozenLake-v1",
  desc=random_desc,
  is_slippery=True
)
print('\n'.join(''.join(cell.decode() for cell in row) for row in env.unwrapped.desc))


SFFFFFFFFF
HFFFHFFFFF
FFFFFFFFFH
FFFFFFFFFF
FFFFFFFFFF
FFFFFFFFFF
FHFFHFFFFF
FFFFFFFFFF
FFFFFFFFFF
FFFFFFFFFG


### Monte carlo

In [10]:
start = time.time()
Q = MC(env,100000)
end = time.time()
policy = get_policy(Q)

print(F"Time taken by Monte Carlo :{end-start} seconds")
test(env,policy)

Time taken by Monte Carlo :63.19105243682861 seconds
Average episode length :65.2232
Average reward per episode :1.0


### Sarsa

In [11]:
start = time.time()
Q = Sarsa(env,10000)
end = time.time()
policy = get_policy(Q)

print(F"Time taken by Sarsa :{end-start} seconds")
test(env,policy)

Time taken by Sarsa :9.30546498298645 seconds
Average episode length :68.3706
Average reward per episode :0.9912


### Q-learning

In [12]:
start = time.time()
Q = Q_learning(env,15000)
end = time.time()
policy = get_policy(Q)

print(F"Time taken by Q-learning :{end-start} seconds")
test(env,policy)

Time taken by Q-learning :11.597620010375977 seconds
Average episode length :65.4546
Average reward per episode :1.0


### Policy iteration

In [13]:
start = time.time()
policy = policy_iteration(env)
end = time.time()

test(env,policy)

Policy Iteration took 14 iterations to converge
Policy Iteration took 0.08269071578979492 seconds to converge
Average episode length :59.38
Average reward per episode :1.0


# Bonus Task(Cliff walking)
#### Lets test these algorithms on the cliff walk environment

In [14]:
from gymnasium.envs.registration import register
from cliff import *

register(
    id='CustomCliff-v0',
    entry_point='cliff:CustomCliffEnv' 
)
cliff_env = gym.make("CustomCliff-v0",P=custom_prob)

### Sarsa

In [20]:
start = time.time()
Q = Sarsa(cliff_env,1000,gamma=1)
end = time.time()
policy = get_policy(Q)

print(F"Time taken by Sarsa :{end-start} seconds")
test(cliff_env,policy)

Time taken by Sarsa :0.9916045665740967 seconds
Average episode length :15.0
Average reward per episode :-15.0


### Q-learning

In [17]:
start = time.time()
Q = Q_learning(cliff_env,1000,gamma=1)
end = time.time()
policy = get_policy(Q)

print(F"Time taken by Q-learning :{end-start} seconds")
test(cliff_env,policy)

Time taken by Q-learning :0.6049695014953613 seconds
Average episode length :13.0
Average reward per episode :-13.0


### policy iteration

In [18]:
start = time.time()
policy = policy_iteration(cliff_env)
end = time.time()

test(cliff_env,policy)

Policy Iteration took 15 iterations to converge
Policy Iteration took 0.016982316970825195 seconds to converge
Average episode length :13.0
Average reward per episode :-13.0
