## Imports

In [40]:
import numpy as np
import gymnasium as gym
from IPython.display import HTML
from base64 import b64encode
import imageio

## Utils

In [41]:
def record_video(env, policy, out_directory, fps=1, random_action=False, max_steps=100):
    images = []  
    terminated = False
    truncated = False
    state, info = env.reset()
    img = env.render()
    images.append(img)
    total_reward = 0
    i = 0
    while not terminated and not truncated:
        i += 1
        if i > max_steps:
            break
        action = np.random.randint(4) if random_action else policy[state]
        state, reward, terminated, truncated, info = env.step(action)
        total_reward += reward
        img = env.render()
        images.append(img)
        if not random_action:
            print(f"step: {i}, action: {action}, state: {state}, reward: {reward}, done: {terminated}, truncated: {truncated}, info: {info}")
    imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)
    return total_reward

In [42]:
def show_video(video_path, video_width=500):
    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

## Random Walk

In [43]:
is_slippery = False
if is_slippery:
    video_name = 'frozenlake_slippery_random.gif'
else:
    video_name = 'frozenlake_random.gif'
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=is_slippery, render_mode='rgb_array')
total_reward = record_video(env, None, video_name, fps=3, random_action=True)
print(f"total reward: {total_reward}")
show_video(video_name, video_width=500)

total reward: 0.0


## Define FrozenLake MDP

In [44]:
class FrozenLakeMDP:
    def __init__(self, is_slippery):
        self.is_slippery = is_slippery
        self.terminal_states = np.zeros(16, dtype=int)
        self.terminal_states[[5, 7, 11, 12, 15]] = 1
        self.reward_fn = np.zeros(16, dtype=int)
        self.reward_fn[15] = 1

    def is_terminal(self, state):
        return self.terminal_states[state]
    
    def get_reward_function(self):
        return self.reward_fn

    def next_state_det(self, state, action):
        if action == 0:    # LEFT
            next_state = state - 1 if state % 4 != 0 else state
        elif action == 1:  # DOWN
            next_state = state + 4 if state // 4 != 3 else state
        elif action == 2:  # RIGHT
            next_state = state + 1 if state % 4 != 3 else state
        elif action == 3:  # UP
            next_state = state - 4 if state // 4 != 0 else state
        else:         # WRONG ACTION
            next_state = state
        return next_state
    
    def trans_prob(self, state, action):
        prob = np.zeros((16,), dtype=float)
        if not self.is_slippery:
            prob[self.next_state_det(state, action)] = 1.0
        else:
            prob[self.next_state_det(state, action)] += 1/3
            prob[self.next_state_det(state, (action+1)%4)] += 1/3
            prob[self.next_state_det(state, (action-1)%4)] += 1/3
        return prob

    def next_state_reward(self, state, action):
        next_state_probs = self.trans_prob(state, action)
        next_state = np.random.choice(16, p=next_state_probs)
        reward = self.reward_fn[next_state]
        return next_state, reward

In [45]:
dynamics = FrozenLakeMDP(is_slippery=False)

In [46]:
# reward function of the environent
dynamics.get_reward_function()

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])

In [47]:
# evaluating if a given state is a terminal state (= hole or goal)
print(dynamics.is_terminal(0), dynamics.is_terminal(7), dynamics.is_terminal(15))

0 1 1


In [48]:
# if we take action `a` in state `s`,
# what is the probability of landing in each state?
dynamics.trans_prob(14, 2)

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.])

In [49]:
# if we take action `a` in state `s`, what do we get?
# this is done through sampling the transition probability
next_state, reward = dynamics.next_state_reward(14, 2)
print(next_state, reward)

15 1


## Iterative Policy Evaluation

In [50]:
def policy_evaluation(dynamics, policy, gamma=0.9, num_iter=100):
    """
    evaluates policy based on Iterative Policy Evaluation.
 
    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        policy (numpy.array): policy we want to evaluate.
        gamma (float): discount factor.
        num_iter (int): number of iterations for the loop.
 
    Returns:
        numpy.array: state value function.
    """

    # Iterative Policy Evaluation algorithm
    s_value_function = np.zeros(16, dtype=float)
    rewards = dynamics.get_reward_function().astype(float)
    for itr in range(num_iter):
        s_value_function_next = np.zeros(16, dtype=float)
        next_states_utility = np.add(rewards, np.multiply(gamma, s_value_function))
        for s in range(16):
            if not dynamics.is_terminal(s):
                s_value_function_next[s] = np.dot(dynamics.trans_prob(s, policy[s]), next_states_utility) 
        diff = np.sum(np.abs(np.subtract(s_value_function_next, s_value_function)))
        s_value_function = s_value_function_next
        if diff < 0.001 * 16:
            # print(f"Number of Policy Evaluation Iterations: {itr}")
            break
    return s_value_function

In [51]:
is_slippery = False
dynamics = FrozenLakeMDP(is_slippery=is_slippery)

# 1. go-right policy
#policy = 2 * np.ones(16, dtype=int)

# 2. shortest-path policy
policy = np.array([1, 2, 1, 0, 1, -1, 1, -1, 2, 1, 1, -1, -1, 2, 2, -1])

s_value_function = policy_evaluation(dynamics, policy)

In [52]:
# print and analyze the state value function
print(f'Resulted Values for {policy} policy and {"" if is_slippery else "non-"}slippery gameplay:')
for i in range(4):
    print(np.array2string(s_value_function[4*i:4*i+4], precision=3, separator=' \t'))

Resulted Values for [ 1  2  1  0  1 -1  1 -1  2  1  1 -1 -1  2  2 -1] policy and non-slippery gameplay:
[0.59  	0.656 	0.729 	0.656]
[0.656 	0.    	0.81  	0.   ]
[0.729 	0.81  	0.9   	0.   ]
[0.  	0.9 	1.  	0. ]


## Policy Iteration

In [53]:
def greedy_policy_improvement(dynamics, s_value_function, gamma=0.9):
    """
    obtains a policy in a greedy manner based on current state value function.
 
    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        s_value_function (numpy.array): state value function.
        gamma (float): discount factor.
 
    Returns:
        numpy.array: the greedy policy.
    """

    # Greedy Policy Improvement algorithm
    policy = np.zeros(16, dtype=int)
    best_action = 0
    best_action_utility = 0.0
    rewards = dynamics.get_reward_function().astype(float)
    next_states_utility = np.add(rewards, np.multiply(gamma, s_value_function))
    for s in range(16): 
        if dynamics.is_terminal(s):
            policy[s] = -1
            continue
        for a in range(4):
            action_utility = np.dot(dynamics.trans_prob(s, a), next_states_utility)
            if a == 0 or action_utility > best_action_utility:
                best_action_utility = action_utility
                best_action = a
        policy[s] = best_action
    return policy

In [54]:
def policy_iteration(dynamics, gamma=0.9, outer_iter=100, inner_iter=100):
    """
    optimizes a policy based on Policy Iteration
 
    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        gamma (float): discount factor.
        outer_iter (int): number of iterations for the Policy Iteration loop.
        inner_iter (int): number of iterations for the Policy Evaluation loop.
 
    Returns:
        numpy.array: the optimized policy.
    """

    # Policy Iteration algorithm
    policy = np.random.randint(0, 4, size=16)
    early_stop = False
    for policy_itr in range(outer_iter):
        s_value = policy_evaluation(dynamics, policy, gamma, inner_iter)
        new_policy = greedy_policy_improvement(dynamics, s_value)
        if np.sum(np.abs(np.subtract(new_policy, policy))) == 0:
            early_stop = True
            # print("Number of Policy Iterations: ", policy_itr)
            print("Value Matrix:")
            for i in range(4):
                print(np.array2string(s_value[4*i:4*i+4], precision=3, separator=' \t'))
            break
        policy = new_policy
    if not early_stop:
        print("Value Matrix:")
        for i in range(4):
            print(np.array2string(s_value[4*i:4*i+4], precision=3, separator=' \t'))
    return policy

In [55]:
# test and analyze the algorithm
is_slippery = False
gamma = 0.9
dynamics = FrozenLakeMDP(is_slippery=is_slippery)
print(f'Achieved value and policy matrices for {"" if is_slippery else "non-"}slippery gameplay and gamma={gamma}:')
policy = policy_iteration(dynamics, gamma=gamma)
print("Policy Matrix:")
for i in range(4):
    print(np.array2string(policy[4*i:4*i+4], separator='\t'))

Achieved value and policy matrices for non-slippery gameplay and gamma=0.9:
Value Matrix:
[0.59  	0.656 	0.729 	0.656]
[0.656 	0.    	0.81  	0.   ]
[0.729 	0.81  	0.9   	0.   ]
[0.  	0.9 	1.  	0. ]
Policy Matrix:
[1	2	1	0]
[ 1	-1	 1	-1]
[ 2	 1	 1	-1]
[-1	 2	 2	-1]


In [56]:
# test the policy on the environment
if is_slippery:
    video_name = 'frozenlake_slippery_PolicyItr.gif'
else:
    video_name = 'frozenlake_PolicyItr.gif'
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=is_slippery, render_mode='rgb_array')
total_reward = record_video(env, policy, video_name, fps=5, random_action=False)
print(f"total reward: {total_reward}")
show_video(video_name, video_width=500)

step: 1, action: 1, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
step: 2, action: 1, state: 8, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
step: 3, action: 2, state: 9, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
step: 4, action: 1, state: 13, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
step: 5, action: 2, state: 14, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
step: 6, action: 2, state: 15, reward: 1.0, done: True, truncated: False, info: {'prob': 1.0}
total reward: 1.0


## Q-Learning

In [57]:
class QAgent:  # The Q-Learning RL agent

    def __init__(self, num_states, num_actions, epsilon, alpha, gamma=0.9, eps_end=0.01, eps_decay=3e-6, alpha_end = 0.03, alpha_decay = 1e-6):

        self.num_states = num_states    # number of possible states
        self.num_actions = num_actions  # number of possible actions
        self.gamma = gamma              # discount factor
        self.epsilon = epsilon          # initial exploration probability
        self.alpha = alpha              # step size
        self.eps_decay = eps_decay      # linear decay rate of epsilon
        self.eps_end = eps_end          # minimum value for epsilon
        self.alpha_end = alpha_end      # linear decay rate of alpha
        self.alpha_decay = alpha_decay  # minimum value for alpha
        self.q_table = np.zeros((num_states, num_actions), dtype=float)

    def get_q(self):
        return self.q_table
    
    def choose_action(self, state):
        """
        chooses an action in an epsilon-greedy manner.
    
        Args:
            state (int): current state of the agent.
    
        Returns:
            int: the chosen action
        """
        
        # epsilon-greedy action selection
        action = 0
        if np.random.uniform() < self.epsilon:
            action = np.random.randint(self.num_actions)
        else:
            action = np.argmax(self.q_table[state])
        return action

    def learn(self, state, action, reward, next_state):
        """
        updates the q-table based on a single interaction with the environment.
    
        Args:
            state (int): state of the agent.
            action (int): action chosen by the agent.
            reward (int): reward obtained by the agent.
            next_state (int): next state of the agent.
        """

        # Q-table update
        self.q_table[state, action] += self.alpha * (reward + self.gamma * np.max(self.q_table[next_state]) - self.q_table[state, action])
        
        # epsilon decay
        self.epsilon = self.epsilon - self.eps_decay if self.epsilon > self.eps_end else self.eps_end
        # alpha decay
        self.alpha = self.alpha - self.alpha_decay if self.alpha > self.alpha_end else self.alpha_end

In [58]:
def train(env, agent, n_episodes=100000):
    """
        trains an agent through interactions with the environemnt using Q-learning.
    
        Args:
            env (gym.Env): the gym environment.
            agent (QAgent): the Q-learning agent.
            n_episodes (int): number of training episodes.
    """

    for episode_idx in range(n_episodes):
        # the training loop for Q-learning
        terminated = False
        truncated = False
        state, info = env.reset()
        # print(f"Value Matrix {episode_idx}:")
        # for i in range(4):
        #     for j in range(4):
        #         print(np.max(agent.get_q()[4*i+j]), end=' \t')
        #     print()
        while not (terminated or truncated):
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            agent.learn(state, action, reward, next_state)
            state = next_state
        

In [59]:
is_slippery = True
alpha = 3e-1
gamma = 0.9999
episode_count = 10000
epsilon_decay = 0.3 / episode_count
alpha_decay = 0.1 / episode_count
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=is_slippery)
agent = QAgent(num_states=16, num_actions=4, epsilon=1.0, alpha=alpha, gamma=gamma, eps_decay = epsilon_decay, alpha_decay = alpha_decay)

In [60]:
train(env, agent, n_episodes=episode_count)

In [61]:
# obtain the policy by a simple argmax on agent's Q-table
# Also setting terminal states policy to -1
terminals = np.subtract(1, np.heaviside(np.sum(agent.q_table, axis=1), 0))
policy = np.add(np.multiply(np.subtract(1, terminals), np.argmax(agent.q_table, axis=1)), np.multiply(-1, terminals)).astype(int)
print(f'Policy Matrix achieved for {"" if is_slippery else "non-"}slippery gameplay and\nalpha={alpha}, gamma={gamma}, eps_decay={"{:.1e}".format(epsilon_decay)}, alpha_decay={"{:.1e}".format(alpha_decay)} during {episode_count} episodes:')
for i in range(4):
    print(np.array2string(policy[4*i:4*i+4], separator='\t'))

Policy Matrix achieved for slippery gameplay and
alpha=0.3, gamma=0.9999, eps_decay=3.0e-05, alpha_decay=1.0e-05 during 10000 episodes:
[2	3	3	3]
[ 0	-1	 2	-1]
[ 3	 1	 0	-1]
[-1	 2	 1	-1]


In [62]:
# test the policy on the environment
if is_slippery:
    video_name = 'frozenlake_slippery_QLearn.gif'
else:
    video_name = 'frozenlake_QLearn.gif'
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=is_slippery, render_mode='rgb_array')
total_reward = record_video(env, policy, video_name, fps=5, random_action=False)
print(f"total reward: {total_reward}")
show_video(video_name, video_width=500)

step: 1, action: 2, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 2, action: 0, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 3, action: 0, state: 8, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 4, action: 3, state: 8, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 5, action: 3, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 6, action: 0, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 7, action: 0, state: 8, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 8, action: 3, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 9, action: 0, state: 0, reward: 0.0, done: False, truncated: False, info: {'prob': 0.3333333333333333}
step: 10, action: 2