## Imports

In [1]:
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1


In [2]:
import numpy as np
import gymnasium as gym
from IPython.display import HTML
from base64 import b64encode
import imageio

## Utils

In [3]:
def record_video(env, policy, out_directory, fps=1, random_action=False, max_steps=100):
    images = []
    done = False
    truncated = False
    state, info = env.reset()
    img = env.render()
    images.append(img)
    total_reward = 0
    i = 0
    while not done and not truncated:
        i += 1
        if i > max_steps:
            break
        action = np.random.randint(4) if random_action else policy[state]
        state, reward, done, truncated, info = env.step(action)
        total_reward += reward
        img = env.render()
        images.append(img)
        if not random_action:
            print(f"action: {action}, state: {state}, reward: {reward}, done: {done}, truncated: {truncated}, info: {info}")
    imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)
    return total_reward

In [4]:
def show_video(video_path, video_width=500):
    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

## Random Walk

In [5]:
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode='rgb_array')
total_reward = record_video(env, None, 'frozenlake_random.mp4', fps=3, random_action=True)
print(f"total reward: {total_reward}")
show_video('frozenlake_random.mp4', video_width=500)

total reward: 0.0


## Define FrozenLake MDP

In [6]:
class FrozenLakeMDP:
    def __init__(self, is_slippery):
        self.is_slippery = is_slippery
        self.terminal_states = np.zeros(16, dtype=int)
        self.terminal_states[[5, 7, 11, 12, 15]] = 1
        self.reward_fn = np.zeros(16, dtype=int)
        self.reward_fn[15] = 1

    def is_terminal(self, state):
        return self.terminal_states[state]

    def get_reward_function(self):
        return self.reward_fn

    def next_state_det(self, state, action):
        if action == 0:    # LEFT
            next_state = state - 1 if state % 4 != 0 else state
        elif action == 1:  # DOWN
            next_state = state + 4 if state // 4 != 3 else state
        elif action == 2:  # RIGHT
            next_state = state + 1 if state % 4 != 3 else state
        elif action == 3:  # UP
            next_state = state - 4 if state // 4 != 0 else state
        else:         # WRONG ACTION
            next_state = state
        return next_state

    def trans_prob(self, state, action):
        prob = np.zeros((16,), dtype=float)
        if not self.is_slippery:
            prob[self.next_state_det(state, action)] = 1.0
        else:
            prob[self.next_state_det(state, action)] += 1/3
            prob[self.next_state_det(state, (action+1)%4)] += 1/3
            prob[self.next_state_det(state, (action-1)%4)] += 1/3
        return prob

    def next_state_reward(self, state, action):
        next_state_probs = self.trans_prob(state, action)
        next_state = np.random.choice(16, p=next_state_probs)
        reward = self.reward_fn[next_state]
        return next_state, reward

In [7]:
dynamics = FrozenLakeMDP(is_slippery=False)

In [8]:
# reward function of the environent
dynamics.get_reward_function()

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])

In [9]:
# evaluating if a given state is a terminal state (= hole or goal)
print(dynamics.is_terminal(0), dynamics.is_terminal(7), dynamics.is_terminal(15))

0 1 1


In [10]:
# if we take action `a` in state `s`,
# what is the probability of landing in each state?
dynamics.trans_prob(14, 2)

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.])

In [11]:
# if we take action `a` in state `s`, what do we get?
# this is done through sampling the transition probability
next_state, reward = dynamics.next_state_reward(14, 2)
print(next_state, reward)

15 1


## Iterative Policy Evaluation

In [18]:
def policy_evaluation(dynamics, policy, gamma=0.9, num_iter=10):
    """
    evaluates policy based on Iterative Policy Evaluation.

    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        policy (numpy.array): policy we want to evaluate.
        gamma (float): discount factor.
        num_iter (int): number of iterations for the loop.

    Returns:
        numpy.array: state value function.
    """

    # TODO: implement Iterative Policy Evaluation algorithm

    s_value_function = np.zeros(16, dtype=float)

    rewards = dynamics.get_reward_function()
    for i in range(num_iter):
      for j in range(16):
        if dynamics.is_terminal(j):
          s_value_function[j] = rewards[j]
          continue
        vki = 0
        probs = dynamics.trans_prob(j, policy[j])
        for k in range(16):
          vki += probs[k] * (rewards[j] + gamma * s_value_function[k])
        s_value_function[j] = vki


    return s_value_function

In [19]:
dynamics = FrozenLakeMDP(is_slippery=False)

# 1. go-right policy
policy = 2 * np.ones(16, dtype=int)

# 2. shortest-path policy
policy = np.array([1, 2, 1, 0, 1, -1, 1, -1, 2, 1, 1, -1, -1, 2, 2, -1])

s_value_function = policy_evaluation(dynamics, policy)

In [20]:
# TODO: print and analyze the state value function
print(s_value_function.reshape((4,4)))

[[0.531441 0.59049  0.6561   0.59049 ]
 [0.59049  0.       0.729    0.      ]
 [0.6561   0.729    0.81     0.      ]
 [0.       0.81     0.9      1.      ]]


همانطور که مشاهده میشود هرچه به خانه ۱۶ نزدیک میشویم مقدار آن بیشتر میشود زیرا سریعتر میتوان امتیاز را به دست آورد و تاثیر تخفیف کمتر میشود.
همچنین با انجام آن برای مثال اول مشاهده میشود که فقط سطر آخر غیر صفر است زیرا در سطر های دیگر با رفتن به سمت راست امتیازی کسب نمیشود.

## Policy Iteration

In [None]:
def greedy_policy_improvement(dynamics, s_value_function, gamma=0.9):
    """
    obtains a policy in a greedy manner based on current state value function.

    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        s_value_function (numpy.array): state value function.
        gamma (float): discount factor.

    Returns:
        numpy.array: the greedy policy.
    """

    # TODO: implement Greedy Policy Improvement algorithm

    policy = np.random.randint(0, 4, size=16)

    rewards = dynamics.get_reward_function()


    for i in range(16):
      best_action = 0
      best_v = 0
      for j in range(4):
        v = 0
        probs = dynamics.trans_prob(i, j)
        for k in range(16):
          v += probs[k] * (rewards[k] + gamma * s_value_function[k])
        if v>best_v:
          best_v=v
          best_action = j
      policy[i]=best_action


    return policy

In [None]:
def policy_iteration(dynamics, gamma=0.9, outer_iter=100, inner_iter=100):
    """
    optimizes a policy based on Policy Iteration

    Args:
        dynamics (FrozenLakeMDP): dynamics of the environment.
        gamma (float): discount factor.
        outer_iter (int): number of iterations for the Policy Iteration loop.
        inner_iter (int): number of iterations for the Policy Evaluation loop.

    Returns:
        numpy.array: the optimized policy.
    """

    # TODO: implement Policy Iteration algorithm
    policy = np.random.randint(0, 4, size=16)

    for i in range(outer_iter):
      s_eval = policy_evaluation(dynamics=dynamics, policy=policy, gamma=gamma, num_iter=inner_iter)
      new_policy = greedy_policy_improvement(dynamics, s_eval, gamma=gamma)
      flag = True
      for j in range(16):
        if policy[j] != new_policy[j]:
          flag = False
          break
      if flag:
        return policy
      policy = new_policy

    return policy

In [None]:
# TODO: test and analyze the algorithm

dynamics = FrozenLakeMDP(is_slippery=False)
policy = policy_iteration(dynamics)
print(policy)

[1 2 1 0 1 1 1 0 2 1 1 1 2 2 2 1]


In [None]:
# TODO: test the policy on the environment

env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode='rgb_array')
total_reward = record_video(env, policy, 'frozenlake_random.mp4', fps=5, random_action=False)
print(f"total reward: {total_reward}")
show_video('frozenlake_random.mp4', video_width=500)

action: 1, state: 4, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 8, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 9, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 13, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 14, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 15, reward: 1.0, done: True, truncated: False, info: {'prob': 1.0}
total reward: 1.0


## Q-Learning

In [None]:
class QAgent:  # The Q-Learning RL agent

    def __init__(self, num_states, num_actions, epsilon, alpha, gamma=0.9, eps_end=0.01, eps_decay=3e-6):

        self.num_states = num_states    # number of possible states
        self.num_actions = num_actions  # number of possible actions
        self.gamma = gamma              # discount factor
        self.epsilon = epsilon          # initial exploration probability
        self.alpha = alpha              # step size
        self.eps_decay = eps_decay      # linear decay rate of epsilon
        self.eps_end = eps_end          # minimum value for epsilon
        self.q_table = np.zeros((num_states, num_actions), dtype=float)

    def choose_action(self, state):
        """
        chooses an action in an epsilon-greedy manner.

        Args:
            state (int): current state of the agent.

        Returns:
            int: the chosen action
        """

        # TODO: implement epsilon-greedy action selection
        rand_num = 1.0 * np.random.randint(0, 100000) / 100000.0
        if rand_num > self.epsilon:
          best_action=0
          best_q = 0
          for i in range(self.num_actions):
            if self.q_table[state, i] > best_q:
              best_action=i
              best_q = self.q_table[state, i]
          return best_action
        else:
          return np.random.randint(0, self.num_actions)

    def learn(self, state, action, reward, next_state):
        """
        updates the q-table based on a single interaction with the environment.

        Args:
            state (int): state of the agent.
            action (int): action chosen by the agent.
            reward (int): reward obtained by the agent.
            next_state (int): next state of the agent.
        """

        # TODO: implement Q-table update
        best_q = 0
        for i in range(self.num_actions):
          if self.q_table[next_state, i] > best_q:
            best_q = self.q_table[next_state, i]
        self.q_table[state, action] += self.alpha * (reward + self.gamma * best_q - self.q_table[state, action])
        # epsilon decay
        self.epsilon = self.epsilon - self.eps_decay if self.epsilon > self.eps_end else self.eps_end

In [None]:
def train(env, agent, n_episodes=100000):
    """
        trains an agent through interactions with the environemnt using Q-learning.

        Args:
            env (gym.Env): the gym environment.
            agent (QAgent): the Q-learning agent.
            n_episodes (int): number of training episodes.
    """

    for i in range(n_episodes):
        # TODO: implement the training loop for Q-learning
        env.reset()
        done = False
        truncated = False
        state = 0
        while not done and not truncated:
          action = agent.choose_action(state)
          next_state, reward, done, truncated, info = env.step(action)
          agent.learn(state, action, reward, next_state)
          state = next_state


In [None]:
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False)
agent = QAgent(num_states=16, num_actions=4, epsilon=1.0, alpha=1e-3)

In [None]:
train(env, agent)

In [None]:
# TODO: obtain the policy by a simple argmax on agent's Q-table
policy = np.zeros(16, dtype=int)

for i in range(agent.num_states):
  best_action=0
  best_q = 0
  for j in range(agent.num_actions):
    if agent.q_table[i, j] > best_q:
      best_action=j
      best_q = agent.q_table[i, j]
  policy[i] = best_action


In [None]:
# TODO: test the policy on the environment

env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode='rgb_array')
total_reward = record_video(env, policy, 'frozenlake_random.mp4', fps=5, random_action=False)
print(f"total reward: {total_reward}")
show_video('frozenlake_random.mp4', video_width=500)

action: 2, state: 1, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 2, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 6, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 10, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 1, state: 14, reward: 0.0, done: False, truncated: False, info: {'prob': 1.0}
action: 2, state: 15, reward: 1.0, done: True, truncated: False, info: {'prob': 1.0}
total reward: 1.0
