# Artificial Intelligence Course - Fall 1402
## Computer Assignment #2 - Reinforcement Learning

# Table of Contents

- [Part 1: Value Iteration & Policy Iteration Algorithms](#1)
    - [َQuestion 1:](#1-0)
    - [َQuestion 2:](#1-1)
    - [َQuestion 3:](#1-12)
    - [َQuestion 4:](#1-2)
    - [َQuestion 5:](#1-3)
        - [Value Iteration](#1-3-1)
        - [Policy Iteration](#1-3-2)
    - [َQuestion 6:](#1-4)
        - [Value Iteration](#1-4-1)
        - [Policy Iteration](#1-4-2)
- [Part 2: Q-Learning Algorithm](#2)
    - [َQuestion 8:](#2-1)
    - [َQuestion 9:](#2-2)
    - [َQuestion 10:](#2-3)

In [1]:
# import
import numpy as np
import gymnasium as gym
from time import sleep, time
from typing import *

In [2]:
def monitor_time(f: Callable):
    start = time()
    result = f()
    end = time()
    return result, end - start

In [3]:
frozen_lake_discount_factor = 0.9
frozen_lake_env = gym.make('FrozenLake-v1', 
               desc=None, 
               map_name="8x8", 
               is_slippery=True,
               render_mode="rgb_array")

<a name='1'></a>
## Part 1: Value Iteration & Policy Iteration Algorithms

<a name='1-0'></a>
### Question 1:

<a name='1-1'></a>
### Question 2:

In [4]:

class ValueIteration:
    def __init__(self: Self, env: gym.Env, discount_factor: float, theta:float=1e-8):
        self.env = env
        self.discount_factor = discount_factor
        self.theta = theta
        self.state_values = np.zeros(env.observation_space.n)
        self.q_values = np.zeros((env.observation_space.n, 
                    env.action_space.n))
        
        self.is_value_estimated = False

    def value_estimation(self: Self):
        if self.is_value_estimated:
            return
        env, state_values, q_values, discount_factor, theta = \
                self.env, \
                self.state_values, \
                self.q_values, \
                self.discount_factor, \
                self.theta
            
        delta = np.inf

        while(delta > theta):
            delta = 0

            for state in range(env.observation_space.n):
                previous_state_value = state_values[state]

                for action in range(env.action_space.n):
                    action_value = 0
                    for probability, next_state, reward, _ in env.unwrapped.P[state][action]:
                        action_value += probability * \
                                (reward + \
                                discount_factor * state_values[next_state])
                    q_values[state, action] = action_value

                state_values[state] = np.max(q_values[state,:])

                delta = np.max([delta, abs(previous_state_value - state_values[state])])
                
        self.is_value_estimated = True

    def take_action(self: Self, action: Any):
        next_state, reward, terminated, _, _ = self.env.step(action)
        return next_state, reward, terminated

    def get_optimal_policy(self: Self, state: Any):
        return np.argmax(self.q_values[state,:])

    def get_state_values(self: Self):
        return self.state_values

    def get_q_values(self: Self):
        return self.q_values

    def reset(self: Self, *args, **kwargs):
        initial_state, _ = self.env.reset(*args, **kwargs)
        return initial_state, False
    
    def auto(self: Self, seed:int=573, frame_freeze:float=0.02):
        env = self.env

        self.value_estimation()
        state, terminated = self.reset(seed=seed)
        try:
            while not terminated:
                env.render() ; sleep(frame_freeze)
                action = self.get_optimal_policy(state)
                state, _, terminated = self.take_action(action)
            env.render() ; sleep(frame_freeze)
        finally:
            env.close()
            
    def __repr__(self: Self):
        env = self.env
        nrow, ncol = env.unwrapped.nrow, env.unwrapped.ncol
        n = env.observation_space.n
        
        self.value_estimation()
        to_format = np.vectorize(lambda x: f"{x:.2f}")
        state_values = to_format(self.get_state_values()).reshape(nrow, ncol)
        optimal_policy = np.array([self.get_optimal_policy(state) for state in range(n)]).reshape(nrow, ncol)
        st = "\n".join(["-" * 10,
                "State Utilities:",
                str(state_values),
                "Optimal Policy:",
                str(optimal_policy),
                "-" * 10])
        return st
        
    __str__ = __repr__

<a name='1-12'></a>
### Question 3:

<a name='1-2'></a>
### Question 4:

In [5]:
class ModifiedPolicyIteration():
    def __init__(self: Self, env: gym.Env, discount_factor:float, theta:float=1e-8):
        self.env = env
        self.discount_factor = discount_factor
        self.theta = theta
        self.state_values = np.zeros((env.observation_space.n))
        self.q_values = np.zeros((env.observation_space.n, env.action_space.n))
        self.policy = np.random.randint(env.action_space.n, size=env.observation_space.n) # initial policy
        self.policy_stable = False

    def policy_evaluation(self):
        env, discount_factor, theta, state_values, policy = self.env, \
                self.discount_factor, \
                self.theta, \
                self.state_values, \
                self.policy
        delta = np.inf

        while(delta > theta):

            delta = 0

            for state in range(env.observation_space.n):

                previous_state_value = state_values[state]

                new_state_value = 0
                for probability, next_state, reward, _ in env.unwrapped.P[state][policy[state]]:
                    new_state_value += probability * \
                                (reward + \
                                discount_factor * state_values[next_state])
                state_values[state] = new_state_value

                delta = np.max([delta, abs(previous_state_value - new_state_value)])

    def policy_improvement(self: Self):
        env, discount_factor, state_values, q_values, policy = self.env, \
                self.discount_factor, \
                self.state_values, \
                self.q_values, \
                self.policy
        
        is_policy_stable = True
        for state in range(env.observation_space.n):
            old_policy = policy[state]

            for action in range(env.action_space.n):

                action_value = 0
                for probability, next_state, reward, _ in env.unwrapped.P[state][action]:
                    action_value += probability * \
                                (reward + \
                                discount_factor * state_values[next_state])
                q_values[state, action] = action_value

            policy[state] = np.argmax(q_values[state,:])

            if old_policy != policy[state]:
                is_policy_stable = False
        
        self.policy_stable = is_policy_stable

    def policy_estimation(self: Self):
        
        while not self.policy_stable:
            self.policy_evaluation()
            self.policy_improvement()

    def take_action(self: Self, action: Any):
        next_state, reward, terminated, _, _ = self.env.step(action)
        return next_state, reward, terminated

    def get_optimal_policy(self: Self, state):
        return self.policy[state]

    def get_state_values(self: Self):
        return self.state_values

    def get_q_values(self: Self):
        return self.q_values

    def reset(self: Self, *args, **kwargs):
        initial_state, _ = self.env.reset(*args, **kwargs)
        return initial_state, False
    
    def auto(self: Self, seed:int=573, frame_freeze:float=0.02):
        env = self.env

        self.policy_estimation()
        state, terminated = self.reset(seed=seed)
        try:
            while not terminated:
                env.render() ; sleep(frame_freeze)
                action = self.get_optimal_policy(state)
                state, _, terminated = self.take_action(action)
            env.render() ; sleep(frame_freeze)
        finally:
            env.close()
            
    def __repr__(self: Self):
        env, policy = self.env, self.policy
        nrow, ncol = env.unwrapped.nrow, env.unwrapped.ncol
        n = env.observation_space.n
        
        self.policy_estimation()
        to_format = np.vectorize(lambda x: f"{x:.2f}")
        state_values = to_format(self.get_state_values()).reshape(nrow, ncol)
        optimal_policy = np.array(policy).reshape(nrow, ncol)
        st = "\n".join(["-" * 10,
                "State Utilities:",
                str(state_values),
                "Optimal Policy:",
                str(optimal_policy),
                "-" * 10])
        return st
        
    __str__ = __repr__

<a name='1-3'></a>
### Question 5:

<a name='1-3-1'></a>
#### Value Iteration:

In [6]:
value_iteration = ValueIteration(frozen_lake_env, frozen_lake_discount_factor)
_, value_estimation_time = monitor_time(lambda: value_iteration.value_estimation())
print(f"value estimation took {value_estimation_time}s to process.")
print(value_iteration)
value_iteration.auto()

value estimation took 0.18031716346740723s to process.
----------
State Utilities:
[['0.01' '0.01' '0.01' '0.02' '0.03' '0.03' '0.04' '0.04']
 ['0.01' '0.01' '0.01' '0.02' '0.03' '0.04' '0.05' '0.06']
 ['0.01' '0.01' '0.01' '0.00' '0.03' '0.04' '0.07' '0.08']
 ['0.00' '0.00' '0.01' '0.01' '0.02' '0.00' '0.09' '0.13']
 ['0.00' '0.00' '0.00' '0.00' '0.03' '0.06' '0.11' '0.21']
 ['0.00' '0.00' '0.00' '0.01' '0.03' '0.06' '0.00' '0.36']
 ['0.00' '0.00' '0.00' '0.00' '0.00' '0.12' '0.00' '0.63']
 ['0.00' '0.00' '0.00' '0.00' '0.14' '0.32' '0.61' '0.00']]
Optimal Policy:
[[3 2 2 2 2 2 2 2]
 [3 3 3 3 2 2 2 1]
 [3 3 0 0 2 3 2 1]
 [3 3 3 1 0 0 2 1]
 [3 3 0 0 2 1 3 2]
 [0 0 0 1 3 0 0 2]
 [0 0 1 0 0 0 0 2]
 [0 1 0 0 1 1 1 0]]
----------


<a name='1-3-2'></a>
#### Policy Iteration:

In [7]:
policy_iteration = ModifiedPolicyIteration(frozen_lake_env, frozen_lake_discount_factor)
_, policy_estimation_time = monitor_time(lambda: policy_iteration.policy_estimation())
print(f"policy estimation took {policy_estimation_time}s to process.")
print(policy_iteration)
policy_iteration.auto()

policy estimation took 0.24733185768127441s to process.
----------
State Utilities:
[['0.01' '0.01' '0.01' '0.02' '0.03' '0.03' '0.04' '0.04']
 ['0.01' '0.01' '0.01' '0.02' '0.03' '0.04' '0.05' '0.06']
 ['0.01' '0.01' '0.01' '0.00' '0.03' '0.04' '0.07' '0.08']
 ['0.00' '0.00' '0.01' '0.01' '0.02' '0.00' '0.09' '0.13']
 ['0.00' '0.00' '0.00' '0.00' '0.03' '0.06' '0.11' '0.21']
 ['0.00' '0.00' '0.00' '0.01' '0.03' '0.06' '0.00' '0.36']
 ['0.00' '0.00' '0.00' '0.00' '0.00' '0.12' '0.00' '0.63']
 ['0.00' '0.00' '0.00' '0.00' '0.14' '0.32' '0.61' '0.00']]
Optimal Policy:
[[3 2 2 2 2 2 2 2]
 [3 3 3 3 2 2 2 1]
 [3 3 0 0 2 3 2 1]
 [3 3 3 1 0 0 2 1]
 [3 3 0 0 2 1 3 2]
 [0 0 0 1 3 0 0 2]
 [0 0 1 0 0 0 0 2]
 [0 1 0 0 1 1 1 0]]
----------


<a name='1-4'></a>
### Question 6:

<a name='1-4-1'></a>
#### Value Iteration:

<a name='1-4-2'></a>
#### Policy Iteration:

<a name='2'></a>
## Part 2: Q-Learning Algorithm

In [8]:
# hyperparameters
REPS = 20
EPISODES = 2000
EPSILON = 0.1
LEARNING_RATE = 0.1
DISCOUNT = 0.9
STUDENT_NUM = 123

In [9]:
# environment
env = gym.make('Taxi-v3')
env.seed(seed = STUDENT_NUM)
Initial_State = env.reset()
Initial_State

  logger.warn(


AttributeError: 'TaxiEnv' object has no attribute 'seed'

In [None]:
taxi_row, taxi_col, pass_idx, dest_idx = env.decode(Initial_State)
taxi_row, taxi_col, pass_idx, dest_idx

In [None]:
# get familiar with the environment
print("you can see the environment in each step by render command :")
env.render()

In [None]:
# Total no. of states
env.observation_space.n

In [None]:
# Total no. of actions
env.action_space.n

In [None]:
# base code for Q-learning

env = gym.make('Taxi-v3')
env.seed(seed = STUDENT_NUM)


for rep in range(REPS):
    agent = # Agent Object instance from Algorithm_name(e.g Q_learning_agent) class which has inherited from Agentbase.
    for episode in range(EPISODES):
        Initial_state = env.reset()

        for ... :

            bestAction = np.random.choice(ACTIONS)

            next_state,rew,done,_ = environment.step(bestAction)

            if done:
                break

<a name='2-1'></a>
### Question 8:

In [None]:
class QLearningAgent():
    def __init__(self, env, epsilon, learning_rate, discount_factor, seed):
      self.env = env
      self.epsilon = epsilon
      self.learning_rate = learning_rate
      self.olr = learning_rate
      self.discount_factor = discount_factor
      self.q_table = np.zeros((env.observation_space.n, env.action_space.n))
      self.seed = seed

    def choose_action(self, state):
      ### START CODE HERE ###
      # With probability epsilon, choose a random action

      # Otherwise, choose the action with the highest Q-value

      ### END CODE HERE ###
      return action

    def update_q_table(self, state, action, nextState, reward):
      ### START CODE HERE ###
      # Calculate the new Q-value using the Q-learning formula
      self.q_table[state][action] = ...
      ### END CODE HERE ###

    def decay_epsilon(self, episode):
      ### START CODE HERE ###
      self.epsilon = ...
      ### END CODE HERE ###

    def decrease_learning_rate(self, episode):
      ### START CODE HERE ###
      self.learning_rate = ...
      ### END CODE HERE ###

    def take_action(self, action):
      next_state, reward, done, _ = self.env.step(action)
      return next_state, reward, done

    def get_optimal_policy(self, state):
      return np.argmax(self.q_table[state])

    def get_q_values(self):
      return self.q_table

    def reset(self):
      # self.learning_rate = self.olr
      return self.env.reset(seed=self.seed)

<a name='2-2'></a>
### Question 9:

<a name='2-3'></a>
### Question 10: