## Imports

In [261]:
import numpy as np
import gymnasium as gym
from IPython.display import HTML
from base64 import b64encode
import imageio

## Utils

In [262]:
def record_video(env, policy, out_directory, fps=1, random_action=False, max_steps=100):
    images = []  
    done = False
    truncated = False
    state, info = env.reset()
    img = env.render()
    images.append(img)
    total_reward = 0
    i = 0
    while not done and not truncated:
        i += 1
        if i > max_steps:
            break
        action = np.random.randint(4) if random_action else policy[state]
        state, reward, done, truncated, info = env.step(action)
        total_reward += reward
        img = env.render()
        images.append(img)
        if not random_action:
            print(f"action: {action}, state: {state}, reward: {reward}, done: {done}, truncated: {truncated}, info: {info}")
    imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)
    return total_reward

In [263]:
def show_video(video_path, video_width=500):
    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

## Random Walk

In [264]:
#from moviepy.config import change_settings
#change_settings({"FFMPEG_BINARY": "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/ffmpeg"})
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode='rgb_array')
total_reward = record_video(env, None, 'frozenlake_random.mp4', fps=3, random_action=True)
print(f"total reward: {total_reward}")
show_video('frozenlake_random.mp4', video_width=500)

total reward: 0.0


## Define FrozenLake MDP

In [265]:
class FrozenLakeMDP:
    def __init__(self, is_slippery):
        self.is_slippery = is_slippery
        self.terminal_states = np.zeros(16, dtype=int)
        self.terminal_states[[5, 7, 11, 12, 15]] = 1
        self.reward_fn = np.zeros(16, dtype=int)
        self.reward_fn[15] = 1

    def is_terminal(self, state):
        return self.terminal_states[state]
    
    def get_reward_function(self):
        return self.reward_fn

    def next_state_det(self, state, action):
        if action == 0:    # LEFT
            next_state = state - 1 if state % 4 != 0 else state
        elif action == 1:  # DOWN
            next_state = state + 4 if state // 4 != 3 else state
        elif action == 2:  # RIGHT
            next_state = state + 1 if state % 4 != 3 else state
        elif action == 3:  # UP
            next_state = state - 4 if state // 4 != 0 else state
        else:         # WRONG ACTION
            next_state = state
        return next_state
    
    def trans_prob(self, state, action):
        prob = np.zeros((16,), dtype=float)
        if not self.is_slippery:
            prob[self.next_state_det(state, action)] = 1.0
        else:
            prob[self.next_state_det(state, action)] += 1/3
            prob[self.next_state_det(state, (action+1)%4)] += 1/3
            prob[self.next_state_det(state, (action-1)%4)] += 1/3
        return prob

    def next_state_reward(self, state, action):
        next_state_probs = self.trans_prob(state, action)
        next_state = np.random.choice(16, p=next_state_probs)
        reward = self.reward_fn[next_state]
        return next_state, reward

In [266]:
dynamics = FrozenLakeMDP(is_slippery=False)

In [267]:
# reward function of the environent
dynamics.get_reward_function()

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])

In [268]:
# evaluating if a given state is a terminal state (= hole or goal)
print(dynamics.is_terminal(0), dynamics.is_terminal(7), dynamics.is_terminal(15))

0 1 1


In [269]:
# if we take action `a` in state `s`,
# what is the probability of landing in each state?
dynamics.trans_prob(14, 2)

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.])

In [270]:
# if we take action `a` in state `s`, what do we get?
# this is done through sampling the transition probability
next_state, reward = dynamics.next_state_reward(14, 2)
print(next_state, reward)

15 1


## Iterative Policy Evaluation

In [271]:
def policy_evaluation(dynamics, policy, gamma=0.9, num_iter=10):
    """
    evaluates policy based on Iterative Policy Evaluation.
 
    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        policy (numpy.array): policy we want to evaluate.
        gamma (float): discount factor.
        num_iter (int): number of iterations for the loop.
 
    Returns:
        numpy.array: state value function.
    """


    s_value_function = np.zeros(16, dtype=float)
    for iteration in range(num_iter):
        for current_state in range(16):
            if dynamics.terminal_states[current_state]:
                continue
            else:
                next_state_probs = dynamics.trans_prob(current_state, policy[current_state])
                next_value_of_current_state = 0
                for next_state in range(16):
                    next_value_of_current_state += next_state_probs[next_state] * (dynamics.reward_fn[next_state] +  s_value_function[next_state]) * gamma
                s_value_function[current_state] = next_value_of_current_state
    return s_value_function

In [272]:
dynamics = FrozenLakeMDP(is_slippery=False)

# 1. go-right policy
policy_go_right = 2 * np.ones(16, dtype=int)

# 2. shortest-path policy
policy_shortest_path = np.array([1, 2, 1, 0, 1, -1, 1, -1, 2, 1, 1, -1, -1, 2, 2, -1])

s_value_function_shortest_path = policy_evaluation(dynamics, policy_shortest_path)
s_value_function_go_right = policy_evaluation(dynamics, policy_go_right)

In [273]:
s_value_function_shortest_path

array([0.531441, 0.59049 , 0.6561  , 0.59049 , 0.59049 , 0.      ,
       0.729   , 0.      , 0.6561  , 0.729   , 0.81    , 0.      ,
       0.      , 0.81    , 0.9     , 0.      ])

In [274]:
s_value_function_go_right

array([0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ,
       0.  , 0.  , 0.81, 0.9 , 0.  ])

In [275]:
# The value of a state s under a certain policy π will converge to the expected discounted rewards gained onwards where agent
# starts from state s and performs actions based on policy π in each future state until a termination condition is met.
# The state values for terminal states are zero since the agent can't gain any rewards onwards i.e. there is no future rewards
# from these states onwards.
# The go_right policy values are non-zero, only for the non-terminal last row states, since the agent following always go-right
# policy can only gain rewards in the last row since the actions in other rows end up in termination states 
# with no rewards cumulated.

## Policy Iteration

In [276]:
def greedy_policy_improvement(dynamics, s_value_function, gamma=0.9):
    """
    obtains a policy in a greedy manner based on current state value function.
 
    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        s_value_function (numpy.array): state value function.
        gamma (float): discount factor.
 
    Returns:
        numpy.array: the greedy policy.
    """


    policy = np.random.randint(0, 4, size=16) 
    for current_state in range(16):
        #value_of_s = -float('inf')
        expected = [0, 0, 0, 0]
        for action in range(4):
            next_state_probs = dynamics.trans_prob(current_state, action)
            for next_state in range (16):
                expected[action] += next_state_probs[next_state] * (dynamics.reward_fn[next_state] +  s_value_function[next_state]) * gamma 
        policy[current_state] = np.argmax(expected)
    return policy

In [277]:
def policy_iteration(dynamics, gamma=0.9, outer_iter=100, inner_iter=100):
    """
    optimizes a policy based on Policy Iteration
 
    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        gamma (float): discount factor.
        outer_iter (int): number of iterations for the Policy Iteration loop.
        inner_iter (int): number of iterations for the Policy Evaluation loop.
 
    Returns:
        numpy.array: the optimized policy.
    """


    policy = np.random.randint(0, 4, size=16)
    for iteration in range(outer_iter):
        s_value_function = policy_evaluation(dynamics, policy, num_iter=inner_iter, gamma=gamma)
        policy = greedy_policy_improvement(dynamics, s_value_function)
    return policy

In [278]:
dynamics = FrozenLakeMDP(is_slippery=False)
policy = policy_iteration(dynamics)

In [279]:
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode='rgb_array')
total_reward = record_video(env, policy, 'frozenlake_policy_actions.mp4', fps=5, random_action=False)
print(f"total reward: {total_reward}")
show_video('frozenlake_policy_actions.mp4', video_width=500)

action: 1, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 8, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 9, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 13, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 14, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 15, reward: 1.0, done: True, truncated: False, info: {'prob': 1.0}
total reward: 1.0


In [280]:
# Testing the policy on the environment in slippery mode

dynamics = FrozenLakeMDP(is_slippery=True)
policy = policy_iteration(dynamics)

env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True, render_mode='rgb_array')
total_reward = record_video(env, policy, 'frozenlake_policy_actions_slippery.mp4', fps=5, random_action=False)
print(f"total reward: {total_reward}")
show_video('frozenlake_policy_actions_slippery.mp4', video_width=500)

action: 0, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 0, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 0, state: 8, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 3, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 0, state: 8, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 3, state: 9, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 1, state: 10, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 0, state: 6, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 0, state: 10, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 0, state: 6, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333

In [281]:
# when environmet is slippery, there is no policy that can take the agent the to state 15 with 100 percent certainty,
# since the agent may always end up in water hole. The reason behind this is, no matter which path the agent takes, it is
# always probable for the agent to end up at state 6 where no action can guarantee agent's safety.

## Q-Learning

In [282]:
class QAgent:  # The Q-Learning RL agent

    def __init__(self, num_states, num_actions, epsilon, alpha, gamma=0.9, eps_end=0.01, eps_decay=3e-6):

        self.num_states = num_states    # number of possible states
        self.num_actions = num_actions  # number of possible actions
        self.gamma = gamma              # discount factor
        self.epsilon = epsilon          # initial exploration probability
        self.alpha = alpha              # step size
        self.eps_decay = eps_decay      # linear decay rate of epsilon
        self.eps_end = eps_end          # minimum value for epsilon
        self.q_table = np.zeros((num_states, num_actions), dtype=float)

    def choose_action(self, state):
        """
        chooses an action in an epsilon-greedy manner.
    
        Args:
            state (int): current state of the agent.
    
        Returns:
            int: the chosen action
        """
        

        random_number = np.random.random()
        chosen_action = 0
        if random_number > self.epsilon:
            chosen_action = np.argmax(self.q_table[state])
        else:
            chosen_action = np.random.choice(self.num_actions)
        return chosen_action

    def learn(self, state, action, reward, next_state):
        """
        updates the q-table based on a single interaction with the environment.
    
        Args:
            state (int): state of the agent.
            action (int): action chosen by the agent.
            reward (int): reward obtained by the agent.
            next_state (int): next state of the agent.
        """
        
        max_next_qs = max(self.q_table[next_state])
        difference = reward + self.gamma * max_next_qs - self.q_table[state, action]
        self.q_table[state, action] = self.q_table[state, action] + self.alpha * difference
        
        # epsilon decay
        self.epsilon = self.epsilon - self.eps_decay if self.epsilon > self.eps_end else self.eps_end

In [283]:
def train(env, agent, n_episodes=100000):
    """
        trains an agent through interactions with the environemnt using Q-learning.
    
        Args:
            env (gym.Env): the gym environment.
            agent (QAgent): the Q-learning agent.
            n_episodes (int): number of training episodes.
    """

    initial_state = 0
    env.reset()
    for episode_no in range(n_episodes):
        action = agent.choose_action(initial_state)
        next_state, reward, done, truncated, info = env.step(action)
        agent.learn(initial_state, action, reward, next_state)
        initial_state = next_state
        if done or truncated:
            env.reset()

In [284]:
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False)
agent = QAgent(num_states=16, num_actions=4, epsilon=1.0, alpha=1e-3)

In [285]:
train(env, agent)

In [286]:

policy = np.zeros(agent.num_states, dtype=int)
for state in range(agent.num_states):
    for action in range(agent.num_actions):
        if agent.q_table[state, action] > agent.q_table[state, policy[state]]:
            policy[state] = action
# The obtained policy when q_agent is not in slippery mode
print(policy)

[2 2 1 0 1 2 1 2 2 2 1 2 2 2 2 2]


In [287]:
# Test the policy on the environment in non slippery mode

env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode='rgb_array')
total_reward = record_video(env, policy, 'frozenlake_qs_learned.mp4', fps=5, random_action=False)

print(f"total reward: {total_reward}")
show_video('frozenlake_qs_learned.mp4', video_width=500)

print('obtained policy when q_agent is in non slippery mode')
print(policy)
print('\n')




action: 2, state: 1, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 2, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 6, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 10, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 14, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 15, reward: 1.0, done: True, truncated: False, info: {'prob': 1.0}
total reward: 1.0
obtained policy when q_agent is in non slippery mode
[2 2 1 0 1 2 1 2 2 2 1 2 2 2 2 2]




In [288]:
# Test the policy on the environment in slippery mode

env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True)
agent = QAgent(num_states=16, num_actions=4, epsilon=1.0, alpha=1e-3)

train(env, agent)

policy = np.zeros(agent.num_states, dtype=int)
for state in range(agent.num_states):
    for action in range(agent.num_actions):
        if agent.q_table[state, action] > agent.q_table[state, policy[state]]:
            policy[state] = action

env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True, render_mode='rgb_array')
total_reward = record_video(env, policy, 'frozenlake_qs_learned_slippery.mp4', fps=5, random_action=False)
print(f"total reward: {total_reward}")
show_video('frozenlake_qs_learned_slippery.mp4', video_width=500)

# The obtained policy when q_agent is in slippery mode
print('obtained policy when q_agent is in slippery mode')
print(policy)
print('\n')

action: 2, state: 1, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 2, state: 1, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 2, state: 1, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 2, state: 1, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 2, state: 1, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 2, state: 2, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 2, state: 2, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 2, state: 3, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 0, state: 2, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
action: 2, state: 6, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}


In [289]:
# As mentioned earlier, we can never find a policy to guarantee agent reaches state 15. So no matter how many episodes
# we feed into agent's training time, it can always end up in a water hole. But there is a policy which we (the omniscient)
# can tell is best. for state 0 and 4 it is left, for state 8 up, for state 9 down, 13 right, 14 down, hopping we don't get
# into state 10 and consequently state 6. When environment is not slippery with differnt hyper parameters (especially epsilon) the agent can take different
# paths.