# Artificial Intelligence Course - Fall 1402
## Computer Assignment #2 - Reinforcement Learning

# Table of Contents

- [Part 1: Value Iteration & Policy Iteration Algorithms](#1)
    - [َQuestion 1:](#1-0)
    - [َQuestion 2:](#1-1)
    - [َQuestion 3:](#1-12)
    - [َQuestion 4:](#1-2)
    - [َQuestion 5:](#1-3)
        - [Value Iteration](#1-3-1)
        - [Policy Iteration](#1-3-2)
    - [َQuestion 6:](#1-4)
        - [Value Iteration](#1-4-1)
        - [Policy Iteration](#1-4-2)
- [Part 2: Q-Learning Algorithm](#2)
    - [َQuestion 8:](#2-1)
    - [َQuestion 9:](#2-2)
    - [َQuestion 10:](#2-3)

In [39]:
#import
import numpy as np
import gym
import random

<a name='1'></a>
## Part 1: Value Iteration & Policy Iteration Algorithms

In [40]:
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False, render_mode="human")

In [41]:
# get familiar with the environment
print("you can see the environment in each step by render command :")
env.reset()
env.render()

you can see the environment in each step by render command :


In [42]:
# Total no. of states
env.observation_space.n

16

In [43]:
# Total no. of actions
env.action_space.n

4

<a name='1-0'></a>
### Question 1:

Value Iteration iteratively updates the state values until they converge, and then it derives the optimal policy based on these values. The algorithm is guaranteed to find the optimal policy for a finite MDP (Markov Decision Process) given a sufficiently small threshold and an appropriate discount factor. It's an efficient method for solving reinforcement learning problems.

<a name='1-1'></a>
### Question 2:

In [44]:
class ValueIteration():
    def __init__(self, env, discount_factor, theta=1e-8):
        self.env = env
        self.discount_factor = discount_factor
        self.theta = theta
        self.reset()
        self.state_values = np.ones((self.env.observation_space.n)) / self.env.action_space.n
        self.q_values = np.ones((self.env.observation_space.n, self.env.action_space.n)) / self.env.action_space.n
        self.state_values[self.env.observation_space.n - 1] = 0
        self.q_values[self.env.observation_space.n - 1] = np.zeros((self.env.action_space.n))

    def value_estimation(self):
        self.delta = np.inf

        while(self.delta > self.theta):

            self.delta = 0

            for state in range(self.env.observation_space.n):

                v = self.state_values[state]

                for action in range(self.env.action_space.n):
                    action_value = 0
                    for probability, next_state, reward, done in self.env.P[state][action]:
                        ### START CODE HERE ###
                        for probability, next_state, reward, done in self.env.P[state][action]:
                            action_value += probability * (reward + self.discount_factor * self.state_values[next_state])
                        ### END CODE HERE ###
                    self.q_values[state, action] = action_value

                self.state_values[state] = np.max(self.q_values[state,:])

                self.delta = np.max([self.delta, abs(v - self.state_values[state])])

                if (self.delta < self.theta):
                    break

    def take_action(self, action):
        test_step = self.env.step(action)
        return test_step
        # next_state, reward, done, _ = self.env.step(action)
        # return next_state, reward, done

    def get_optimal_policy(self, state):
        return np.argmax(self.q_values[state,:])

    def get_state_values(self):
        return self.state_values

    def get_q_values(self):
        return self.q_values

    def reset(self):
        initial_state = self.env.reset()
        return initial_state

<a name='1-12'></a>
### Question 3:

Policy Iteration alternates between evaluating the current policy and improving it. This process continues until the policy no longer changes, indicating that the optimal policy has been found. Policy Iteration is guaranteed to converge to the optimal policy for a finite MDP given an appropriate discount factor and a stopping criterion. It's a powerful and flexible algorithm that can handle complex reinforcement learning problems.

<a name='1-2'></a>
### Question 4:

In [45]:
class PolicyIteration():
    def __init__(self, env, discount_factor, theta=1e-8):
        self.env = env
        self.discount_factor = discount_factor
        self.theta = theta
        self.reset()
        self.state_values = np.ones((self.env.observation_space.n)) / self.env.action_space.n
        self.q_values = np.ones((self.env.observation_space.n, self.env.action_space.n)) / self.env.action_space.n
        self.state_values[self.env.observation_space.n - 1] = 0
        self.q_values[self.env.observation_space.n - 1] = np.zeros((self.env.action_space.n))
        self.policy = np.random.randint(self.env.action_space.n, size=self.env.observation_space.n) # initial policy
        self.policy_stable = False
    
    def policy_evaluation(self):
        self.delta = np.inf

        while(self.delta >= self.theta):

            self.delta = 0

            for state in range(self.env.observation_space.n):

                v = self.state_values[state]

                new_state_value = 0
                for probability, next_state, reward, done in self.env.P[state][self.policy[state]]:
                    ### START CODE HERE ###
                    new_state_value += probability * (reward + self.discount_factor * self.state_values[next_state])
                    ### END CODE HERE ###
                self.state_values[state] = new_state_value

                self.delta = np.max([self.delta, abs(v - self.state_values[state])])

    def policy_improvement(self):
        self.policy_stable = True

        for state in range(self.env.observation_space.n):
            old_policy = self.policy[state]

            for action in range(self.env.action_space.n):

                action_value = 0
                for probability, next_state, reward, done in self.env.P[state][action]:
                    ### START CODE HERE ###
                    action_value += probability * (reward + self.discount_factor * self.state_values[next_state])
                    ### END CODE HERE ###
                self.q_values[state, action] = action_value

            self.policy[state] = np.argmax(self.q_values[state,:])

            if old_policy != self.policy[state]:
                self.policy_stable = False

    def policy_estimation(self):
        self.policy_stable = False

        while not self.policy_stable:
            self.policy_evaluation()
            self.policy_improvement()

    def take_action(self, action):
        test_step = self.env.step(action)
        return test_step
        # next_state, reward, done = self.env.step(action)
        # return next_state, reward, done

    def get_optimal_policy(self, state):
        return self.policy[state]

    def get_state_values(self):
        return self.state_values

    def get_q_values(self):
        return self.q_values

    def reset(self):
        initial_state = self.env.reset()
        return initial_state

<a name='1-3'></a>
### Question 5:

<a name='1-3-1'></a>
#### Value Iteration:

<br><br>-Calculate State Action Values:

In [46]:
vi = ValueIteration(env, discount_factor=0.9)
vi.value_estimation()
state_action_values = vi.get_q_values()
print("State Action Values:\n",state_action_values)

State Action Values:
 [[0.531441   0.59049    0.59049    0.531441  ]
 [0.531441   0.13286025 0.6561     0.59049   ]
 [0.59049    0.729      0.59049    0.6561    ]
 [0.6561     0.13286025 0.59049    0.59049   ]
 [0.59049    0.6561     0.13286025 0.531441  ]
 [0.13286025 0.13286025 0.13286025 0.13286025]
 [0.11957423 0.81       0.13286025 0.6561    ]
 [0.13286025 0.13286025 0.13286025 0.13286025]
 [0.6561     0.13286025 0.729      0.59049   ]
 [0.6561     0.81       0.81       0.11957423]
 [0.729      0.9        0.13286025 0.729     ]
 [0.13286025 0.13286025 0.13286025 0.13286025]
 [0.13286025 0.13286025 0.13286025 0.13286025]
 [0.11957423 0.81       0.9        0.729     ]
 [0.81       0.9        1.         0.81      ]
 [0.         0.         0.         0.        ]]


<br><br>-Calculate State Values:

In [47]:
state_values = vi.get_state_values()
line = 0
print("State Values:")
print("-------------------------------")
for i in range(len(state_values)):
    print("{:.4f}".format(state_values[i]), end="| ")
    if (i + 1) % 4 == 0 and line < 3:
        line = line + 1
        print("\n-------------------------------")
print("\n-------------------------------")


State Values:
-------------------------------
0.5905| 0.6561| 0.7290| 0.6561| 
-------------------------------
0.6561| 0.1329| 0.8100| 0.1329| 
-------------------------------
0.7290| 0.8100| 0.9000| 0.1329| 
-------------------------------
0.1329| 0.9000| 1.0000| 0.0000| 
-------------------------------


<br><br>-Calculate Optimal Policy:

In [48]:
line = 0
print("Optimal Policy:")
print("----------------")
policy_chars = ["⬅️", "⬇️", "➡️", "⬆️"]
for i in range(16):
    if i == 15:
        print("🎁",end="|")
    else:
        optimal_policy = vi.get_optimal_policy(i)
        print(policy_chars[optimal_policy], end="| ")
    if (i + 1) % 4 == 0 and line < 3:
        line = line + 1
        print("\n----------------")
print("\n----------------")

Optimal Policy:
----------------
⬇️| ➡️| ⬇️| ⬅️| 
----------------
⬇️| ⬅️| ⬇️| ⬅️| 
----------------
➡️| ⬇️| ⬇️| ⬅️| 
----------------
⬅️| ➡️| ➡️| 🎁|
----------------


<br><br>-Show Actions:

In [49]:
for j in range(5):
    env.reset()
    state = 0
    for i in range(10):
        action = vi.get_optimal_policy(state)
        tl = vi.take_action(action)
        next_state = tl[0]; reward = tl[1]; done = tl[2]
        score =+ reward
        if j == 1:
            print("Step%d Reward = %d" % (i, score))
        # env.step(action)
        if done == True:
            break
        env.render()
        state = next_state

Step0 Reward = 0
Step1 Reward = 0
Step2 Reward = 0
Step3 Reward = 0
Step4 Reward = 0
Step5 Reward = 1


<a name='1-3-2'></a>
#### Policy Iteration:

<br><br>-Calculate State Action Values:

In [50]:
pi = PolicyIteration(env, discount_factor=0.9)
pi.policy_evaluation()
pi.policy_improvement()
pi.policy_estimation()
q_values = pi.get_q_values()
print("State Action Values:\n",q_values)

State Action Values:
 [[5.31441000e-01 5.90490000e-01 5.90490000e-01 5.31441000e-01]
 [5.31441000e-01 2.02082624e-08 6.56100000e-01 5.90490000e-01]
 [5.90490000e-01 7.29000000e-01 5.90490000e-01 6.56100000e-01]
 [6.56100000e-01 2.02082624e-08 5.90490000e-01 5.90490000e-01]
 [5.90490000e-01 6.56100000e-01 2.02082624e-08 5.31441000e-01]
 [2.02082624e-08 2.02082624e-08 2.02082624e-08 2.02082624e-08]
 [2.02082624e-08 8.10000000e-01 2.02082624e-08 6.56100000e-01]
 [2.02082624e-08 2.02082624e-08 2.02082624e-08 2.02082624e-08]
 [6.56100000e-01 2.02082624e-08 7.29000000e-01 5.90490000e-01]
 [6.56100000e-01 8.10000000e-01 8.10000000e-01 2.02082624e-08]
 [7.29000000e-01 9.00000000e-01 2.02082624e-08 7.29000000e-01]
 [2.02082624e-08 2.02082624e-08 2.02082624e-08 2.02082624e-08]
 [2.02082624e-08 2.02082624e-08 2.02082624e-08 2.02082624e-08]
 [2.02082624e-08 8.10000000e-01 9.00000000e-01 7.29000000e-01]
 [8.10000000e-01 9.00000000e-01 1.00000000e+00 8.10000000e-01]
 [0.00000000e+00 0.00000000e+00 0

<br><br>-Calculate State Values:

In [51]:
state_values = pi.get_state_values()
line = 0
print("State Values:")
print("-------------------------------")
for i in range(len(state_values)):
    print("{:.4f}".format(state_values[i]), end="| ")
    if (i + 1) % 4 == 0 and line < 3:
        line = line + 1
        print("\n-------------------------------")
print("\n-------------------------------")

State Values:
-------------------------------
0.5905| 0.6561| 0.7290| 0.6561| 
-------------------------------
0.6561| 0.0000| 0.8100| 0.0000| 
-------------------------------
0.7290| 0.8100| 0.9000| 0.0000| 
-------------------------------
0.0000| 0.9000| 1.0000| 0.0000| 
-------------------------------


<br><br>-Calculate Optimal Policy:

In [52]:
line = 0
print("Optimal Policy:")
print("----------------")
policy_chars = ["⬅️", "⬇️", "➡️", "⬆️"]
for i in range(16):
    if i == 15:
        print("🎁",end="|")
    else:
        optimal_policy = pi.get_optimal_policy(i)
        print(policy_chars[optimal_policy], end="| ")
    if (i + 1) % 4 == 0 and line < 3:
        line = line + 1
        print("\n----------------")
print("\n----------------")

Optimal Policy:
----------------
⬇️| ➡️| ⬇️| ⬅️| 
----------------
⬇️| ⬅️| ⬇️| ⬅️| 
----------------
➡️| ⬇️| ⬇️| ⬅️| 
----------------
⬅️| ➡️| ➡️| 🎁|
----------------


<br><br>-Show Actions:

In [53]:
for j in range(5):
    env.reset()
    state = 0
    for i in range(10):
        action = pi.get_optimal_policy(state)
        tl = pi.take_action(action)
        next_state = tl[0]; reward = tl[1]; done = tl[2]
        score =+ reward
        if j == 1:
            print("Step%d Reward = %d" % (i, score))
        # env.step(action)
        if done == True:
            break
        env.render()
        state = next_state


Step0 Reward = 0
Step1 Reward = 0
Step2 Reward = 0
Step3 Reward = 0
Step4 Reward = 0
Step5 Reward = 1


<a name='1-4'></a>
### Question 6:

<a name='1-4-1'></a>
#### Value Iteration:

1. **Approach**:

     - Value Iteration combines policy evaluation and policy improvement into a single step. It iteratively updates the state values based on the Bellman equation for the optimal policy, and it continually improves the policy based on the updated values. Value Iteration does not explicitly maintain a policy; instead, it calculates the optimal policy indirectly through value iteration.<br><br>

2. **Convergence**:

     - Value Iteration usually converges faster in terms of iterations because it combines policy evaluation and improvement in a single step. However, each iteration can be computationally more expensive. Value Iteration is also guaranteed to converge to the optimal policy for a finite MDP.<br><br>

3. **Computational Efficiency**:

     - Value Iteration can be computationally expensive because it updates the value of all states in each iteration. It is generally faster in terms of iterations but can be slower in terms of computation.<br><br>

4. **Use Cases**:

   - **Value Iteration** is commonly used when you are primarily interested in finding the optimal value function and policy is a byproduct. It is suitable when the state space is large, and the focus is on convergence speed.

<a name='1-4-2'></a>
#### Policy Iteration:

1. **Approach**:

     - Policy Iteration explicitly alternates between two main steps: policy evaluation and policy improvement. It starts with an initial policy, evaluates that policy's performance, improves the policy based on the evaluated values, and repeats the process until the policy stabilizes (no longer changes). It focuses on refining the policy iteratively.<br><br>

2. **Convergence**:

     - Policy Iteration typically requires more iterations but fewer computations per iteration. It is guaranteed to converge to the optimal policy for a finite MDP.<br><br>

3. **Computational Efficiency**:

     - Policy Iteration can be computationally efficient when the policy improvement step is relatively inexpensive. It may be advantageous when you have a good initial policy.<br><br>

4. **Use Cases**:

   - **Policy Iteration** is often used when you want to explicitly track and improve the policy. It is a good choice when the policy space is relatively small or when you have a good initial policy.

<a name='2'></a>
## Part 2: Q-Learning Algorithm

In [54]:
# hyperparameters
REPS = 20
EPISODES = 2000
EPSILON = 0.1
LEARNING_RATE = 0.1
DISCOUNT = 0.9
STUDENT_NUM = 274

In [55]:
# environment
env = gym.make('Taxi-v3', render_mode='human')
Initial_State, _ = env.reset(seed = STUDENT_NUM)
Initial_State, _ = env.reset(seed = STUDENT_NUM)
Initial_State

486

In [56]:
taxi_row, taxi_col, pass_idx, dest_idx = env.decode(Initial_State)
taxi_row, taxi_col, pass_idx, dest_idx

(4, 4, 1, 2)

In [57]:
# get familiar with the environment
print("you can see the environment in each step by render command :")
env.render()

you can see the environment in each step by render command :


In [58]:
# Total no. of states
env.observation_space.n

500

In [59]:
# Total no. of actions
env.action_space.n

6

<a name='2-1'></a>
### Question 8:

In [60]:
class QLearningAgent():
    def __init__(self, env, epsilon, learning_rate, discount_factor, seed):
        self.env = env
        self.epsilon = epsilon
        self.learning_rate = learning_rate
        self.olr = learning_rate
        self.discount_factor = discount_factor
        self.q_table = np.zeros((env.observation_space.n, env.action_space.n))
        self.seed = seed

    def choose_action(self, state):
        print("state: ", state)
        state_index = state[0]  # Assuming the integer part is at index 0
        if random.uniform(0, 1) < self.epsilon:
            action = self.env.action_space.sample()
        else:
            action = np.argmax(self.q_table[state_index, :])
        return action
        
          
            # action = np.argmax(self.q_table[state,:])
            # action = self.get_optimal_policy(state)
        # return action

    def update_q_table(self, state, action, nextState, reward):
        current_q_value = self.q_table[state[0]][action]
        max_next_q_value = np.max(self.q_table[nextState])
        new_q_value = (1 - self.learning_rate) * current_q_value + \
                       self.learning_rate * (reward + self.discount_factor * max_next_q_value)
        self.q_table[state[0]][action] = new_q_value

    def decay_epsilon(self, episode):
        self.epsilon = self.epsilon * 0.99

    def decrease_learning_rate(self, episode):
        self.learning_rate = self.olr / (1 + episode * 0.01)

    def take_action(self, action):
        test_step = self.env.step(action)
        return test_step
        # next_state, reward, done, _ = self.env.step(action)
        # return next_state, reward, done

    def get_optimal_policy(self, state):
        return np.argmax(self.q_table[state])

    def get_q_values(self):
        return self.q_table

    def reset(self):
        # self.learning_rate = self.olr
        return self.env.reset(seed=self.seed)

In [62]:
env = gym.make('Taxi-v3')
env.reset(seed = STUDENT_NUM)

for rep in range(REPS):
    agent = QLearningAgent(env, EPSILON, LEARNING_RATE, DISCOUNT, STUDENT_NUM)
    for episode in range(EPISODES):
        Initial_state = env.reset(seed = STUDENT_NUM)

        for step in range(EPISODES):
            # Choose the best action
            bestAction = agent.choose_action(Initial_state)

            # Take the action and get the new state, reward, and done status
            tl = agent.take_action(bestAction)
            next_state = tl[0]; reward = tl[1]; done = tl[2]
            # next_state, reward, done, info = agent.take_action(bestAction)

            # Update the Q-table
            agent.update_q_table(Initial_state, bestAction, reward, next_state)
            Initial_state = next_state

            if done:
                break

<a name='2-2'></a>
### Question 9:

In this code I have a function named 'decay_epsilon' that decreases the exploration parameter (epsilon) over time. This kind of decay is a common technique in reinforcement learning to shift the agent from exploration to exploitation as it gains more experience. In this case, I'm decreasing epsilon by multiplying it by 0.99 in each call to decay_epsilon.

<a name='2-3'></a>
### Question 10: