In [None]:
import numpy as np
import matplotlib.pyplot as plt
import gym
from gym import spaces
import pandas as pd
import copy as cp
import math

We need 12 min states. So let's define a 4x4 grid having 16 states.

In [None]:
class GridEnvironment(gym.Env):
  metadata = { 'render.modes': []}

  def __init__(self, observation_space, action_space, max_timesteps, stochasticity=1):
    self.observation_space = spaces.Discrete(observation_space)
    self.action_space = spaces.Discrete(action_space)
    self.max_timesteps = max_timesteps
    # self.rewards = np.array([[0, 1, 1, 1],
    #         [5, -5, -5, 1],
    #         [5, -5, -1, 1],
    #         [5, 5, 5, 10]])
    # Implemented a simpler reward calculation system later
    self.stochasticity = stochasticity

  def set_epsilon(self, epsilon):
    self.epsilon = epsilon

  def reset(self):
    self.timestep = 0
    self.agent_pos = [0, 0]
    self.goal_pos = [3, 3]
    self.state = np.zeros((4, 4))
    self.state[tuple(self.agent_pos)] = 1
    self.state[tuple(self.goal_pos)] = 0.5
    observation = self.state.flatten()
    return observation

  def step(self, action):

    if self.stochasticity != 1:
      action = self.get_action_random()

    current_pos = cp.deepcopy(self.agent_pos)

    if action == 0: # Go left
      self.agent_pos[0] -= 1
    if action == 1: # Go up
      self.agent_pos[1] -= 1
    if action == 2: # Go right
      self.agent_pos[0] += 1
    if action == 3: # Go down
      self.agent_pos[1] += 1
    
    self.agent_pos = list(np.clip(self.agent_pos, 0, 3))
    future_pos = self.agent_pos
    self.state = np.zeros((4,4))
    self.state[tuple(self.agent_pos)] = 1
    self.state[tuple(self.goal_pos)] = 0.5
    observation = self.state.flatten()

    reward = self.get_reward(current_pos, future_pos)

    self.timestep += 1
    done = True if self.timestep >= self.max_timesteps or self.agent_pos == self.goal_pos else False
    info = {}

    return observation, reward, done, info, action

  # the reward is calculated on the basis of the distance of the
  # current_pos and future_pos of the agent from the goal_pos
  def get_reward(self, current_pos, future_pos):

    current_dist = self.distance_from_goal(current_pos)
    final_dist = self.distance_from_goal(future_pos)

    if final_dist < current_dist:
      return 5
    else:
      return -5
  
  def distance_from_goal(self, pos):
    x_final, y_final = self.goal_pos
    x, y = pos
    return math.sqrt( (y_final - y)**2 + (x_final - x)**2 )

  def get_action_random(self):
    return np.random.choice(self.action_space.n)
      

  def render(self):
    plt.figure()
    plt.imshow(self.state)

The following function returns the action the agent 🤖 will take

In [None]:
def get_action():
    return np.random.choice(env.action_space.n)

Function to run the agent and generate results ⬇️

In [None]:
def run_agent_and_generate_results():
    rewards_sum = 0
    data_cell = []
    col_headers = ['TimeStep', 'Action', 'Reward', 'Total Rewards', 'Done']
    for timestep in range(0,env.max_timesteps):
        action = get_action()
        observation, reward, done, info, action = env.step(action)
        rewards_sum += reward
        data_cell.append([timestep, action, reward, rewards_sum, done])
        env.render()
        if done:
            break

    plt.subplot(212)
    plt.axis('off')
    plt.table(cellText=data_cell, colLabels=col_headers)


Lets initialize our environment with the following parameters ⬇️ :
- observation_space = 16 Discrete
- action_space = 4 (Left = 0, Up = 1, Right = 2, Down = 3)
- max_timesteps = 10

In [None]:
observation_space = 16
action_space = 4
max_timesteps = 10

env = GridEnvironment(observation_space, action_space, max_timesteps, stochasticity=1)
obs = env.reset()
env.render()

Let's run the agent in a deterministic environment ⬇️

In [None]:
env.reset()
run_agent_and_generate_results()

Let's run the agent in a stochastic environment ⬇️

In [None]:
env = GridEnvironment(observation_space, action_space, max_timesteps, stochasticity=0.9)
obs = env.reset()
run_agent_and_generate_results()

Helper Functions to help update q_table ⬇️

In [None]:
# Function to get the max q value of the future state of the agent
def get_max_q_for_state(pos):
    state = get_state_from_position(pos)
    if state == env.observation_space.n - 1:
        return 0
    q_list = q_table[state]
    return max(q_list)

def get_state_from_position(pos):
    X, Y = env.state.shape
    pos_x, pos_y = pos
    return Y * pos_x + pos_y

# Function to calculate q_value
def update_q_value(current_pos, future_pos, immediate_reward, action):

    state = get_state_from_position(current_pos)

    if get_state_from_position(current_pos) == env.observation_space.n - 1:
        q_table[state][action] = 0
        return

    q_value = q_table[state][action] + learning_rate * ( immediate_reward + discount_factor * get_max_q_for_state(future_pos) - q_table[state][action])
    q_table[state][action] = q_value


def get_action_greedy(table):
    state = get_state_from_position(env.agent_pos)
    val_list = list(table[state])
    action = val_list.index(max(val_list))
    return action


The following function helps us run an episode and return the results ⬇️

In [None]:
# The following function runs an episode
def run_episode_q_learn(episode_num, epsilon, env):

    rewards_sum = 0
    timestep_results = []
    
    is_episode_successful = False
    steps = 0
    penalty = 0
    for timestep in range(0,env.max_timesteps):

        chosen_action = None
        if np.random.random() < epsilon:
            chosen_action = get_action()
        else:
            chosen_action = get_action_greedy(q_table)

        current_pos = cp.deepcopy(env.agent_pos)
        observation, reward, done, info, action = env.step(chosen_action)
        future_pos = cp.deepcopy(env.agent_pos)

        if reward < 0:
            penalty += 1
        
        # Update q table and current episode outcome
        update_q_value(current_pos, future_pos, reward, action)
        rewards_sum += reward
        steps += 1

        # env.render()
        if done:
            if steps < max_timesteps:
                is_episode_successful = True
            break
            
    return timestep_results, rewards_sum, is_episode_successful, steps, penalty, env


Implementation of Q Learning Algorithm.
We need to initialize the following:
 - Q table. Q(terminal_state, any_action) will always be 0
 - Step Size / Learning Rate Alpha
 - Epsilon (start with small value close to 0)
 


In [None]:
q_table = np.zeros((observation_space, action_space))
learning_rate = 0.15
epsilon = 1.0
discount_factor = 0.9
n_episodes = 1000
decay_rate = 0.995

# Initialising pandas dataframes to store results
# Only write data directly to the dataframe from the code below
time_step_results = pd.DataFrame()

# episode_result = pd.DataFrame(columns=ep_res_headers)
episode_result = pd.DataFrame()


Let's begin the training loop ➰

In [None]:
# Begin the Training Loop
stochasticity = 1.0
max_timesteps = 15
envir = GridEnvironment(observation_space, action_space, max_timesteps, stochasticity=stochasticity)
obs = env.reset()

for i in range(0,n_episodes):

    envir.reset()

    timestep_results, rewards_sum, is_episode_successful, steps, penalty, env = run_episode_q_learn(i, epsilon, envir)
    time_step_results = time_step_results.append(timestep_results)

    episode_result = episode_result.append({
        'Episode #': i,
        'Total Rewards': rewards_sum,
        'Successful': is_episode_successful,
        'Steps': steps,
        'Penalty': penalty,
        'Epsilon': epsilon
    }, ignore_index=True)
    epsilon *= decay_rate
    envir = env

    if i == n_episodes - 1:
        print("Final q_table is: \n", q_table)

timestep_file = './timestep_result_train_q_learn_deterministic.xlsx'
episode_file = './episode_result_train_q_learn_deterministic.xlsx'

if stochasticity < 1:
    timestep_file = timestep_file.replace('deterministic', 'stochastic')
    episode_file = episode_file.replace('deterministic', 'stochastic')

time_step_results.to_excel(timestep_file)
episode_result.to_excel(episode_file)


In [None]:
# function to evaluate agent
def evaluate_agent_qlearn(num, env, print_episode=False):

    penalty = 0
    rewards_sum = 0
    data_cell = []
    steps = 0

    for i in range(max_timesteps):

        if print_episode:
            env.render()
        action = get_action_greedy(q_table)
        observation, reward, done, info, action = env.step(action)
        if reward < 0:
            penalty += 1

        rewards_sum += reward
        steps += 1

        data_cell.append([num, i, action, reward, rewards_sum, done])
        if done:
            if print_episode:
                env.render()
            break
        
    return data_cell, rewards_sum, penalty, steps



Let's run a single episode and visualise if the agent 🤖 reaches the goal 🥅 position. The 🤖 choses greedy action each time in this episode

In [None]:
envir.reset()
evaluate_agent_qlearn(1, envir, print_episode=True)

Let's evaluate few more episodes by letting the 🤖 chose greedy policy each time.

In [None]:
def eval_q_learn(env, DETERMINISTIC = True):
    envir.reset()
    n_episodes = 10
    timestep_data = []
    episode_data = pd.DataFrame()

    for i in range(1, n_episodes+1):
        env.reset()
        data_cell, rewards_sum, penalty, steps = evaluate_agent_qlearn(i, env)
        timestep_data.append(data_cell)
        episode_data = episode_data.append({
            'Episode #': i, 
            'Total rewards': rewards_sum, 
            'Penalty': penalty,
            'Steps': steps
            }, ignore_index=True)

        if i == n_episodes:
            print(episode_data.head)


    episode_file = './eval_ep_data_q_learn_deterministic.xlsx'
    if not DETERMINISTIC:
        episode_file = episode_file.replace('deterministic', 'stochastic')
    episode_data.to_excel(episode_file)


In [None]:
eval_q_learn(envir)

Let's train the agent 🤖 in a stochastic environment

In [None]:
q_table = np.zeros((observation_space, action_space))
learning_rate = 0.15
epsilon = 1.0
discount_factor = 0.9
n_episodes = 2000
decay_rate = (0.01/epsilon) ** (1/n_episodes)

# Initialising pandas dataframes to store results
# Only write data directly to the dataframe from the code below
time_step_results = pd.DataFrame()

# episode_result = pd.DataFrame(columns=ep_res_headers)
episode_result = pd.DataFrame()


# Begin the Training Loop
stochasticity = 0.99
max_timesteps = 20
envir = GridEnvironment(observation_space, action_space, max_timesteps, stochasticity=stochasticity)
obs = env.reset()

for i in range(0,n_episodes):

    envir.reset()

    timestep_results, rewards_sum, is_episode_successful, steps, penalty, env = run_episode_q_learn(i, epsilon, envir)
    time_step_results = time_step_results.append(timestep_results)

    episode_result = episode_result.append({
        'Episode #': i,
        'Total Rewards': rewards_sum,
        'Successful': is_episode_successful,
        'Steps': steps,
        'Penalty': penalty,
        'Epsilon': epsilon
    }, ignore_index=True)
    epsilon *= decay_rate

    envir = env

    if i == n_episodes - 1:
        print("Final q_table is: \n", q_table)

timestep_file = './timestep_result_train_q_learn_deterministic.xlsx'
episode_file = './episode_result_train_q_learn_deterministic.xlsx'

if stochasticity < 1:
    timestep_file = timestep_file.replace('deterministic', 'stochastic')
    episode_file = episode_file.replace('deterministic', 'stochastic')

time_step_results.to_excel(timestep_file)
episode_result.to_excel(episode_file)

Let's visualise the agent's performance by running a single episode

In [None]:
envir.reset()
evaluate_agent_qlearn(1, envir, print_episode=True)

Let's evaluate the agent

In [None]:
eval_q_learn(envir, DETERMINISTIC=False)

Implementation of SARSA algorithm

In [None]:
# Helper functions for SARSA

# Function to get the max q value of the future state of the agent
def get_sarsa_for_state(pos, future_action):
    state = get_state_from_position(pos)
    if state == env.observation_space.n - 1:
        return 0
    sarsa_list = sarsa_table[state]
    return sarsa_list[future_action]

# Function to calculate q_value
def update_sarsa_value(current_pos, future_pos, immediate_reward, current_action, future_action):

    state = get_state_from_position(current_pos)

    if get_state_from_position(current_pos) == env.observation_space.n - 1:
        q_table[state][current_action] = 0
        return

    s = sarsa_table[state][current_action] + learning_rate * ( immediate_reward + discount_factor * get_sarsa_for_state(future_pos, future_action) - sarsa_table[state][current_action])
    sarsa_table[state][current_action] = s



In [None]:
# Run episode for SARSA Learning
def run_episode_sarsa(episode_num, epsilon):

    rewards_sum = 0
    timestep_results = []
    
    is_episode_successful = False
    steps = 0
    penalty = 0

    current_action = None

    for timestep in range(0,env.max_timesteps):

        if current_action is None:
            current_action = get_action()

        current_pos = cp.deepcopy(env.agent_pos)
        observation, reward, done, info, action = env.step(current_action)
        future_pos = env.agent_pos

        chosen_action = None
        if np.random.random() < epsilon:
            future_action = get_action()
        else:
            future_action = get_action_greedy(sarsa_table)
        
        if reward < 0:
            penalty += 1
        
        # Update q table and current episode outcome
        update_sarsa_value(current_pos, future_pos, reward, action, future_action)
        rewards_sum += reward
        steps += 1

        current_action = future_action

        timestep_results.append([episode_num, timestep, action, reward, done])
        if done:
            if steps < max_timesteps:
                is_episode_successful = True
            break
            
    return timestep_results, rewards_sum, is_episode_successful, steps, penalty

In [None]:
# Initialize SARSA learning parameters and panda dataframes for storing results
sarsa_table = np.zeros((observation_space, action_space))
learning_rate = 0.15
epsilon = 1.0
discount_factor = 0.9
n_episodes = 1000
decay_rate = 0.995

# Only write data directly to the dataframe from the code below
time_step_results = pd.DataFrame()

# episode_result = pd.DataFrame(columns=ep_res_headers)
episode_result = pd.DataFrame()

# Function to run the Training Loop on SARSA algorithm

max_timesteps = 15
stochasticity = 1
env = GridEnvironment(observation_space, action_space, max_timesteps, stochasticity=stochasticity)
obs = env.reset()

for i in range(0,n_episodes):

    env.reset()

    timestep_results, rewards_sum, is_episode_successful, steps, penalty = run_episode_sarsa(i, epsilon)
    time_step_results = time_step_results.append(timestep_results)
    
    episode_result = episode_result.append({
        'Episode #': i,
        'Total Rewards': rewards_sum,
        'Successful': is_episode_successful,
        'Steps': steps,
        'Penalty': penalty,
        'Epsilon': epsilon
    }, ignore_index=True)
    epsilon *= decay_rate

    if i == n_episodes - 1:
        print(sarsa_table)

timestep_file = './timestep_result_train_sarsa_deterministic.xlsx'
episode_file = './episode_result_train_sarsa_deterministic.xlsx'

if stochasticity < 1:
    timestep_file = timestep_file.replace('deterministic', 'stochastic')
    episode_file = episode_file.replace('deterministic', 'stochastic')

time_step_results.to_excel(timestep_file)
episode_result.to_excel(episode_file)

In [None]:
# Function to evaluate agent on SARSA Learning
def evaluate_agent_sarsa(num, print_episode=False):

    penalty = 0
    rewards_sum = 0
    data_cell = []
    steps = 0

    for i in range(max_timesteps):
        if print_episode:
            env.render()
        action = get_action_greedy(sarsa_table)
        observation, reward, done, info, action = env.step(action)
        if reward < 0:
            penalty += 1

        rewards_sum += reward
        steps += 1
        data_cell.append([num, i, action, reward, rewards_sum, done])
        if done:
            if print_episode:
                env.render()
            break
        
    return data_cell, rewards_sum, penalty, steps

In [None]:
env.reset()
evaluate_agent_sarsa(1, print_episode=True)

In [None]:
# Function to run the evaluation loop
def eval_sarsa(DETERMINISTIC = True):
    env.reset()
    n_episodes = 10
    timestep_data = []
    episode_data = pd.DataFrame()

    for i in range(1, n_episodes+1):
        env.reset()
        data_cell, rewards_sum, penalty, steps = evaluate_agent_sarsa(i)
        timestep_data.append(data_cell)
        episode_data = episode_data.append({
            'Episode #': i, 
            'Total rewards': rewards_sum, 
            'Penalty': penalty,
            'Steps': steps
            }, ignore_index=True)

        if i == n_episodes:
            print(episode_data.head)

    episode_file = './eval_ep_data_sarsa_deterministic.xlsx'
    if not DETERMINISTIC:
        episode_file = episode_file.replace('deterministic', 'stochastic')
    episode_data.to_excel(episode_file)


In [None]:
eval_sarsa()

Let's train the agent on SARSA algorithm in a stochastic environment


In [None]:
# Initialize SARSA learning parameters and panda dataframes for storing results
sarsa_table = np.zeros((observation_space, action_space))
learning_rate = 0.1
epsilon = 1.0
discount_factor = 0.9
n_episodes = 2000
decay_rate = (0.01/epsilon) ** (1/n_episodes)

# Only write data directly to the dataframe from the code below
time_step_results = pd.DataFrame()

# episode_result = pd.DataFrame(columns=ep_res_headers)
episode_result = pd.DataFrame()

# Function to run the Training Loop on SARSA algorithm

max_timesteps = 20
stochasticity = 0.99
env = GridEnvironment(observation_space, action_space, max_timesteps, stochasticity=stochasticity)
obs = env.reset()

for i in range(0,n_episodes):

    env.reset()

    timestep_results, rewards_sum, is_episode_successful, steps, penalty = run_episode_sarsa(i, epsilon)
    time_step_results = time_step_results.append(timestep_results)
    
    episode_result = episode_result.append({
        'Episode #': i,
        'Total Rewards': rewards_sum,
        'Successful': is_episode_successful,
        'Steps': steps,
        'Penalty': penalty,
        'Epsilon': epsilon
    }, ignore_index=True)
    epsilon *= decay_rate

    if i == n_episodes - 1:
        print(sarsa_table)

timestep_file = './timestep_result_train_sarsa_deterministic.xlsx'
episode_file = './episode_result_train_sarsa_stochastic.xlsx'

if stochasticity < 1:
    timestep_file = timestep_file.replace('deterministic', 'stochastic')
    episode_file = episode_file.replace('deterministic', 'stochastic')

time_step_results.to_excel(timestep_file)
episode_result.to_excel(episode_file)

In [None]:
env.reset()
evaluate_agent_sarsa(1, print_episode=True)

Let's evaluate the agent 🤖

In [None]:
eval_sarsa(DETERMINISTIC=False)