Instruction:

Code Components:
1. Initialisation + helper functions
2. Setting up Environment for task1 (env)
3. Setting up Environment for Extended Implementation (env_large)
4. Function for First Visit Monte Carlo Control without exploring starts
5. Function for SARSA with e-Greedy Behaviour Policy
6. Function for Q-learning with e-Greedy Behavior Policy
7. Testing function
Onwards: Training commands for 2 environents using 3 methods = 6 commands in total.

Install pip gym, numpy and pygame if you haven't. Run cells 1 to 7 and then you are ready to train and test. Pick the correct command out of the 6 commands at the end to train. Training parameters need to be adjusted within the functions in 4,5,6. 

In [1]:
#pip install gym, numpy and pygame
import gym
import matplotlib.pyplot as plt
import numpy as np

#helper functions
#Setting the rewards for reaching the hole states as -1 by updating the transition function
def update_transition_fn(env,hole_states):
    num_states = env.observation_space.n                         #number of states
    num_actions = env.action_space.n                             #number of actions
    for s in range(num_states):
        for a in range(num_actions):
            transitions = env.env.P[s][a]                        #transition function: contains data of probability of transition,next state,reward,bool indicating termination state or not
            for p_trans, next_s, r, done in transitions:
                if done and next_s in hole_states:               #if it is termination state and hole states
                    # Set the reward for falling into a hole to -1
                    r = -1
                elif done and next_s == num_states:              #if it is termination state and goal 
                    r = 1
                else:
                    r = 0
                env.env.P[s][a] = [(p_trans, next_s, r, done)]   #update the transition function

Task 1: Setting up the Frozen Lake Environments (Basic Implementation). Run the following cell to set up the 4x4 environment.

In [2]:
#Create the environment using gym from openAI 
env=gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False,render_mode="human")

# Modify the reward table: reward when falling into the hole to -1 instead of 0.
env.env.reward_range = (-1,1)
env.env.rewards = {
    b'G' : 1,
    b'H' : -1,
    b'F' : 0,
    b'S' : 0
}
#get the states of the holes
hole_states = []                                      #Empty array to store the indices of holes
for i in range(len(env.desc)):                        #looping through each row
    for j in range(len(env.desc[0])):                 #looping through each column 
        if env.desc[i][j] == b'H':                    #checking each state if it's a hole
            hole_states.append(i*len(env.desc) + j)   #append to the array, state index = (num of rows x len of row) + num of column
            
#update the transition function
update_transition_fn(env,hole_states)

In [3]:
state = env.reset()
print(env.reset())
print(state[0])

(0, {'prob': 1})
0


Task 2: Extended Implementation-Setting up FrozenLake ENV with 10x10 grid. Run the following cell to set up the 10x10 environment.

In [None]:
#To Generate 10x10 map size with 25% hole to grid ratio, random placement of holes 
np.random.seed(4) #for reproducibility,change this value to create maps with different layouts of holes
size = 10
num_holes = 25  #25% of 100 tiles = 25 holes

#Creating desc to generate FrozenLake map
desc = np.zeros((size, size), dtype='U1')
desc[:, :] = b'F'
# Set the start and goal positions
desc[0, 0] = b'S'
desc[-1, -1] = b'G'
# Place 25 holes randomly on the map
hole_positions = np.random.choice(size*size, num_holes, replace=False)
hole_rows, hole_cols = np.unravel_index(hole_positions, (size, size))
desc[hole_rows, hole_cols] = b'H'

env_large = gym.make('FrozenLake-v1', desc=desc,is_slippery=False,render_mode="human")
env_large.env.reward_range = (-1,1)
env_large.env.rewards = {
    b'G' : 1,
    b'H' : -1,
    b'F' : 0,
    b'S' : 0
}
update_transition_fn(env_large,hole_positions)
#env_large.reset()
#env_large.render()


Implementing Training methods:

METHOD 1: FIRST VISIT MONTE CARLO CONTROL WITHOUT EXPLORING STARTS

In [None]:
#FIRST VISIT MONTE CARLO CONTROL WITHOUT EXPLORING STARTS
# Function inputs :
#     env : environment to be trained on (4x4 or 10x10)
#     num_episodes : number of episodes to be trained
#     check_convergence : True to check the number of episodes needed to reach convergence

# Function outputs:
#     policy : the optimal policy after the training process
#     converge_epi : the number of episodes until convergence happens, set to 0 if check_converge = False

def training_Monte_Carlo(env,num_episodes,check_convergence = True):
    # Initialize the Q-table and the returns table with zeros
    Q = np.zeros([env.observation_space.n, env.action_space.n])
    Returns = {(s,a): [] for s in range(env.observation_space.n) for a in range(env.action_space.n)}
    
    #Training Parameters
    gamma = 0.7 # discount factor
    epsilon = 0.45 # exploration rate
    
    #Convergence Criterion
    tolerence = 0.01         
    start_check = 200         #Start checking after "input" episodes
    Q_changes = []
    converge_epi = 0
   
    # First-visit Monte Carlo control algorithm
    for i in range(num_episodes):
        state = env.reset()
        current_state = state[0]
        done = False
        episode = []
        Q_prev = Q.copy()                #Storing Q table before it gets updated to check for convergence
        
        #training parameter control
        if i%10 == 0:
            epsilon -= 0.001             #gradual decrease of epsilon greedy parameter, adjust accordingly
        
        # Run the episode and record the visited states, actions, and rewards
        while not done:
            if np.random.rand() < epsilon:                          #if the random value is less than epsilon
                action = env.action_space.sample()                  #a random action is chosen out of available actions
            else:                                                   #otherwise
                action = np.argmax(Q[current_state, :])             #an action with the highest Q value is chosen, correct action
            next_state, reward, done, _ ,prob = env.step(action)    #take the action and record the new state, reward and other info provided by step()
            episode.append((current_state, action, reward))         #record in episode array for calculation later
            current_state = next_state                              #update the new state as current state
            
        # When a termination state is reached
        # Update the Q-table using the returns
        G = 0
        for t in reversed(range(len(episode))):                                       #looping the episode array in reverse order; looking back
            current_state, action, reward = episode[t]                                #retrieving data of the episode
            G = gamma * G + reward                                                    #Calculate G
            if (current_state, action) not in [(x[0],x[1]) for x in episode[0:t]]:    #if state,action pair of the episode is not visited
                Returns[(current_state, action)].append(G)                            #Append G to Return
                Q[current_state, action] = np.mean(Returns[(current_state, action)])  #Update Q-table with average of Return(state,action)
        
       
        print(f"  Episode Number: {i}")      #keeping track of training process
        #print(Q)
        #Checking for Convergence
        if check_convergence != False and i >= start_check:       #Conditions for checking
            Q_change = np.max(np.abs(Q_prev - Q))                 #Finding the change in Q table before and after an episode
            Q_changes.append(Q_change)                            #Recording the change
            if np.mean(Q_changes[-start_check:]) <= tolerence:    #if the mean of the changes is lower than tolerence
                converge_epi = i + 1                              #the episode that convergence happens
                break                                             
    #Generate Policy from the final Q table          
    policy = np.argmax(Q, axis=1)
    return policy,converge_epi

METHOD 2: SARSA WITH e-GREEDY BEHAVIOR POLICY

In [None]:
#SARSA WITH AN E-GREEDY BEHAVIOR POLICY
# Function inputs :
#     env : environment to be trained on (4x4 or 10x10)
#     num_episodes : number of episodes to be trained
#     check_convergence : True to check the number of episodes needed to reach convergence

# Function outputs:
#     policy : the optimal policy after the training process
#     converge_epi : the number of episodes until convergence happens, set to 0 if check_converge = False


def training_SARSA(env,num_episodes,check_convergence = True):
    # Initialize the Q-table and the returns table with zeros
    Q = np.zeros([env.observation_space.n, env.action_space.n])
    
    #Training Parameters
    epsilon = 0.7
    alpha = 0.3
    gamma = 0.5
    
    #Convergence Criterion
    tolerence = 0.01  
    start_check = 20
    Q_changes = []
    converge_epi = 0

    # SARSA algorithm
    for i in range(num_episodes):
        state = env.reset()
        current_state = state[0]     # Convert state tuple to integer index
        done = False 
        Q_prev = Q.copy()        #Storing Q table before it gets updated to check for convergence
        print(f"Episode Number {i}")
        
        #Parameter Control
        if i%10 == 0:
            epsilon -= 0.01         #Gradual decrease of epsilon value
            alpha -= 0.01           #Gradual decrease of learning rate: to ensure convergence
        
        # Choose the initial action using epsilon-greedy policy
        if np.random.rand() < epsilon:                   #if a random number [0,1]is lower than epsilon
            action = env.action_space.sample()           #a random action is chosen out of available actions
        else:                                            #else
            action = np.argmax(Q[current_state, :])      #an action with the highest Q value is chosen, correct action
        while not done:
            # Take the action and observe the next state and reward
            next_state, reward, done, _, prob = env.step(action)
            # Choose the next action using epsilon-greedy policy
            if np.random.rand() < epsilon:
                next_action = env.action_space.sample()
            else:
                next_action = np.argmax(Q[next_state, :])
            # Update the Q-table
            current_value = Q[current_state, action]                                        #current Q value
            next_max = np.max(Q[next_state, :])                                             #Get the highest Q value of taking the next step
            new_value = current_value + alpha * (reward + gamma * next_max - current_value) #calculate a new Q value using the SARSA update rule
            Q[current_state, action] = new_value                                            #Update the Q table with the new Q value
            current_state = next_state                                                      #update the state to next state
            action = next_action                                                            #update the action to next action
            print(f"  State: {current_state}, Action: {action}, Reward: {reward}, Q-value: {new_value}")
        if i % 50 == 0:      #To check Q table during training at every 50 episodes
            print(Q)
            
        #Checking for Convergence
        if check_convergence != False and i >= start_check:
            Q_change = np.max(np.abs(Q_prev - Q))
            Q_changes.append(Q_change)
            if np.mean(Q_changes[-start_check:]) <= tolerence:
                converge_epi = i + 1
                break
    print(Q)        
    policy = np.argmax(Q, axis=1)      #Generating optimal policy from the final Q table
    return policy,converge_epi

METHOD 3: Q-LEARNING WITH e-GREEDY BEHAVIOR POLICY

In [None]:
#Q-LEARNING WITH AN E-GREEDY BEHAVIOR POLICY
# Function inputs :
#     env : environment to be trained on (4x4 or 10x10)
#     num_episodes : number of episodes to be trained
#     check_convergence : True to check the number of episodes needed to reach convergence

# Function outputs:
#     policy : the optimal policy after the training process
#     converge_epi : the number of episodes until convergence happens, set to 0 if check_converge = False

def training_Q(env,num_episodes,check_convergence = True):
    # Initialize the Q-table with zeros
    Q = np.zeros([env.observation_space.n, env.action_space.n])
    
    #Training Parameters
    epsilon = 0.45
    alpha = 0.9
    gamma = 0.8

    #Convergence Criterion
    tolerence = 0.01  
    start_check = 200
    Q_changes = []
    converge_epi = 0

    # Q-learning algorithm with an epsilon-greedy behavior policy
    for i in range(num_episodes):
        state = env.reset()
        current_state = state[0]
        done = False
        Q_prev = Q.copy()        #Storing current Q value to check convergence after taking action
        print(f"Episode Number {i}")
        
        #Parameter Control
        if i%10 == 0:
            epsilon -= 0.002
            alpha -= 0.01
            
        while not done:
            if np.random.rand() < epsilon:                    #Choosing an action based on epsilon greedy policy(same as above: SARSA)
                action = env.action_space.sample()
            else:
                action = np.argmax(Q[current_state, :])
                
            next_state, reward, done, _ ,prob = env.step(action)  #recording data of the new state, reward and other data
            #Update Q table using the Q-learning update rule : updated value =  current value + learning rate x (reward + gamma * max Q value - current Q value)
            Q[current_state, action] = Q[current_state, action] + alpha * (reward + gamma * np.max(Q[next_state, :]) - Q[current_state, action])
            current_state = next_state                            #update current state as next state
            
        if i % 50 == 0:      #To check Q table during training at every 50 episodes
            print(Q)

        #Checking for Convergence
        if check_convergence != False and i >= start_check:
            Q_change = np.max(np.abs(Q_prev - Q))
            Q_changes.append(Q_change)
            if np.mean(Q_changes[-start_check:]) <= tolerence:
                converge_epi = i + 1
                break
    print(Q)
    policy = np.argmax(Q, axis=1)   #Generate Optimal Policy
    return policy,converge_epi

Helper function for testing the optimal path.

In [None]:
#Train the models, get Q tables and policies from each training methods, run the test and render the path
def testing(policy):
    # Test the learned policy
    goal_reached = False
    steps = 0
    state = env.reset()
    current_state = state[0]
    done = False
    while not done:
        # Choose the action with the highest Q-value
        #action = np.argmax(Q[state, :])    #use this action instead if you're using Q table and not policy
        action = policy[current_state]
        # Take the action and observe the next state and reward
        next_state, reward, done, _,sth = env.step(action)
        steps += 1
        current_state = next_state
    if reward == 1.0: # goal reached
        goal_reached = True
        print(f"Goal is reached in {steps} steps")
    print("Goal is not reached.")

Training

on 4x4 environment, env

To train the 4x4 frozen lake using First Visit Monte Carlo Control without exploring starts 

In [None]:
#trained_policy,converge_epi = training_Monte_Carlo(env,num_episodes,check_convergence = True/False)
policy_small_MC,converge_epi = training_Monte_Carlo(env,200,check_convergence = True)
print(policy_small_MC)
testing(policy_small_MC)          #to check if the policy reaches the goal by rendering the path

#policy_small_MC = np.reshape(policy_small_MC,(4,4))   #to see the policy according to grid shape to find the path visually


To train the 4x4 frozen lake using SARSA Algorithm with e-greedy behavior policy 

In [None]:
#trained_policy,converge_epi = training_SARSA(env,num_episodes,check_convergence = True/False)
policy_small_SS,converge_epi = training_SARSA(env,300,check_convergence = True)
print(policy_small_SS)
testing(policy_small_SS)          #to check if the policy reaches the goal by rendering the path

#policy_small_SS = np.reshape(policy_small_SS,(4,4))   #to see the policy according to grid shape to find the path visually

To train the 4x4 frozen lake using Q-learning Algorithm with e-greedy behavior policy

In [None]:
#trained_policy,converge_epi = training_Q(env,num_episodes,check_convergence = True/False)
policy_small_Q,converge_epi = training_Q(env,300,check_convergence = True)
print(policy_small_Q)
testing(policy_small_Q)          #to check if the policy reaches the goal by rendering the path

#policy_small_Q = np.reshape(policy_small_Q,(4,4))   #to see the policy according to grid shape to find the path visually

on 10x10 environment , env_large

To train the 10x10 frozen lake using First Visit Monte Carlo Control without exploring starts 

In [None]:
#trained_policy,converge_epi = training_Monte_Carlo(env,num_episodes,check_convergence = True/False)
policy_large_MC,converge_epi = training_Monte_Carlo(env,5000,check_convergence = False)
print(policy_large_MC)
testing(policy_large_MC)          #to check if the policy reaches the goal by rendering the path

#policy_large_MC = np.reshape(policy_large_MC,(10,10))   #to see the policy according to grid shape to find the path visually

To train the 10x10 frozen lake using SARSA Algorithm with e-greedy behavior policy 

In [None]:
#trained_policy,converge_epi = training_SARSA(env,num_episodes,check_convergence = True/False)
policy_large_SS,converge_epi = training_SARSA(env_large,5000,check_convergence = False)
print(policy_large_SS)
testing(policy_large_SS)          #to check if the policy reaches the goal by rendering the path

#policy_large_SS = np.reshape(policy_large_SS,(10,10))   #to see the policy according to grid shape to find the path visually

To train the 10x10 frozen lake using Q-learning Algorithm with e-greedy behavior policy 

In [None]:
#trained_policy,converge_epi = training_Q(env,num_episodes,check_convergence = True/False)
policy_large_Q,converge_epi = training_Q(env_large,5000,check_convergence = False)
print(policy_large_Q)
testing(policy_large_Q)          #to check if the policy reaches the goal by rendering the path

#policy_large_Q = np.reshape(policy_large_Q,(10,10))   #to see the policy according to grid shape to find the path visually