In [46]:
import gymnasium as gym
import numpy as np
import random
from gymnasium.wrappers import RecordVideo
from IPython.display import HTML
from base64 import b64encode

In [47]:
def test_policy_video(policy_table, name, video_width=500):

    video_path = './video/' + name + '-step-0.mp4'
        
    env_test = gym.make('Taxi-v3', render_mode='rgb_array')
    env_test = RecordVideo(env=env_test, video_folder="./video", name_prefix=name, step_trigger=lambda x : True, disable_logger=True)

    state, info = env_test.reset()
    done = False
    steps = 0
    
    env_test.start_video_recorder()

    while not done:
        action = policy_table[state]
        new_state, reward, done, truncated, info = env_test.step(int(action))
        state = new_state
        env_test.render()
        steps += 1
        if steps > 50:
            break
        
    env_test.close_video_recorder()

    env_test.close()

    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

## Defining Taxi Environment

In [48]:
class Taxi:
    def __init__(self,
            learning_rate=0.9, 
            discount_rate=0.8,
            epsilon=1.0,
            decay_rate=0.005,
            num_iter=200,
            num_episodes=10000,
            max_steps=99,
            num_evaluate_steps = 20):
        self.env = gym.make('Taxi-v3') #initializing the environment
        self.state_size = self.env.observation_space.n #size of state space (it is 500 in taxi environment)
        self.action_size = self.env.action_space.n #size of action space (it is 6 in taxi environemt)
        self.vtable = np.zeros(self.state_size) #initializing state value table
        self.ptable = np.zeros(self.state_size) #initializing policy table (each entry contains one of 6 aciton)
        self.qtable = np.zeros((self.state_size, self.action_size)) #initializing q value table (value for each (state, action) pair)
        self.learning_rate = learning_rate #learning rate used in Q-leanring algorithm
        self.discount_rate = discount_rate #discount factor (gamma)
        self.epsilon = epsilon #epsilon for specifying randomness threshold in epsilon-greedy action used in Q-learning
        self.decay_rate = decay_rate #decay rate for decreasing the epsilon during the algorithm
        self.num_iter = num_iter #number of iteration used in algorihtm
        self.num_episodes = num_episodes #number of episodes (each episode is a sequence from start to end of a single game)
        self.max_steps = max_steps #maximum bound for number of steps in each episode
        self.num_evaluate_steps = num_evaluate_steps #number of steps to evaluate a given policy (just used in policy iteration algorithm)
        
    
                

## 1. Value Iteration

### 1.1. Initializing Parameters

In [49]:
#TODO: Initialize the parameters by filling the blanks
taxi = Taxi(num_iter=100, num_evaluate_steps=20, discount_rate=0.8)

### 1.2. Finding Optimal State Values

In [50]:
import numpy as np

def value_iteration(vtable, num_iter, discount_rate):
    for _ in range(num_iter):
        v_old = np.copy(vtable)
        for state in range(taxi.state_size):
            temp_action = np.zeros(taxi.action_size)
            for action in range(taxi.action_size):
                total_value_for_action = 0
                for prob, next_state, reward, done in taxi.env.P[state][action]:
                    if not done:
                        total_value_for_action += prob * (reward + discount_rate * v_old[next_state])
                    else:
                        total_value_for_action += prob * reward
                temp_action[action] = total_value_for_action
            vtable[state] = np.max(temp_action)  
        

### 1.3. Extracting The Optimal Policy

In [51]:
def optimal_policy_extraction(vtable, ptable, num_iter, discount_rate):
    value_iteration(vtable, num_iter, discount_rate)
    for state in range(taxi.state_size):
        temp_action = np.zeros(taxi.action_size)
        for action in range(taxi.action_size):
            total_expected_value = 0
            for prob, next_state, reward, done in taxi.env.P[state][action]:
                total_expected_value += prob * (reward + discount_rate * vtable[next_state] if not done else reward)
            temp_action[action] = total_expected_value
        
        ptable[state] = np.argmax(temp_action)

### 1.4. Running The Algorithm

In [52]:
optimal_policy_extraction(taxi.vtable, taxi.ptable, taxi.num_iter, taxi.discount_rate)
optimal_policy = taxi.ptable.copy()     # saving determined optimal policy

### 1.5. Tesing

In [53]:
test_policy_video(taxi.ptable, 'value-iteration')

## 2. Policy Iteration

### 2.1. Initialize Parameters

In [54]:
#TODO: Initialize the parameters by filling the blanks
taxi = Taxi(num_iter=50, num_evaluate_steps=20, discount_rate=0.8)

### 2.2. Policy Evaluation

In [55]:
def evaluate(num_iter, discount_rate, ptable):
    vtable = np.zeros(taxi.state_size)
    for _ in range(num_iter):
        v_old = np.copy(vtable)
        for state in range(taxi.state_size):
            value = 0
            for prob, next_state, reward, done in taxi.env.P[state][ptable[state]]:
                value += prob * (reward + (0 if done else discount_rate * v_old[next_state]))
            vtable[state] = value
    return vtable

### 2.3. Policy Improvement

In [56]:
def improvement(ptable, num_iter, num_evaluate_steps, discount_rate):
    for iteration in range(num_iter):  # Iterating over the number of iterations
        vtable = evaluate(num_evaluate_steps, discount_rate, ptable).copy()  # Evaluate the current policy
        for state in range(taxi.state_size):  # Loop over each state
            temp_action = np.zeros(taxi.action_size)  # Initialize an array to store action values
            # Compute the value for each action and fill the temp_action array
            for action in range(taxi.action_size):  # Loop over each possible action
                for prob, next_state, reward, done in taxi.env.P[state][action]:  # Loop over transitions
                    if done:
                        temp_action[action] += prob * reward  # Update action value if the state is terminal
                    else:
                        temp_action[action] += prob * (reward + discount_rate * vtable[next_state])  # Update action value if the state is not terminal
            ptable[state] = np.argmax(temp_action)  # Update the policy to the action with the highest value
    

### 2.4. Running The Algorithm

In [57]:
improvement(taxi.ptable, taxi.num_iter, taxi.num_evaluate_steps, taxi.discount_rate)

### 2.5. Testing

In [58]:
test_policy_video(taxi.ptable, 'policy-iteration')

## 3. Q-learning

### 3.1. Initializing Parameters

In [59]:
#TODO: Initialize the parameters by filling the blanks
taxi = Taxi(num_episodes=10000, max_steps=99, learning_rate=0.9, discount_rate=0.8, epsilon=1, decay_rate=0.005)

### 3.2. Training

In [60]:
def q_learing_train(qtable, num_episodes, max_steps, learning_rate, discount_rate, epsilon, decay_rate):
    for episode in range(num_episodes):  # Loop over each episode
        state, info = taxi.env.reset()  # Reset the environment to start a new episode
        done = False
        for step in range(max_steps):  # Loop over each step within the episode
            # Epsilon-greedy action selection
            if random.uniform(0, 1) < epsilon:
                action = taxi.env.action_space.sample()  # Exploration: choose a random action
            else:
                action = np.argmax(qtable[state])  # Exploitation: choose the best action based on Q-values

            new_state, reward, done, truncated, info = taxi.env.step(action)  # Take the action and observe the outcome
            # Update Q-value (main part of Q-learning)
            qtable[state][action] = (1 - learning_rate) * qtable[state][action] + learning_rate * (reward + discount_rate * np.max(qtable[new_state]))
            
            state = new_state  # Move to the next state
            
            if done:  # Check if the episode has ended
                break

        epsilon = np.exp(-decay_rate * episode)  # Decay epsilon for the next episode

### 3.3. Runing The Algorithm

In [61]:
q_learing_train(taxi.qtable, taxi.num_episodes, taxi.max_steps, taxi.learning_rate, taxi.discount_rate, taxi.epsilon, taxi.decay_rate)

### 3.4. Etracting The Policy

In [62]:
for state in range(taxi.state_size):
    taxi.ptable[state] = np.argmax(taxi.qtable[state][:])

### 3.5. Testing

In [63]:
test_policy_video(taxi.ptable, 'Q-learning')

## 4. Direct Evaluation(Monte Carlo)

In this section, the policy is given and you should just determine the value of each state based on the given policy. Then you can evaluate states, based on the optimal policy (you found in the previous sections) and then compare the 2 matrices.

### 4.1. Initializing Parameters

In [64]:
#TODO: Initialize the parameters by filling the blanks
taxi = Taxi(num_episodes=10000, max_steps=99, discount_rate=0.8)

### 4.2. Training

In [45]:
def monte_carlo(ptable, num_episodes, max_steps, discount_rate):
    count = np.ones(taxi.state_size)  # Counts the number of visits to each state
    vtable = np.zeros(taxi.state_size)  # Stores the total return for each state

    for episode in range(num_episodes):  # Loop over each episode
        state, info = taxi.env.reset()  # Reset the environment to start a new episode
        done = False
        trajectory = [state]  # Initialize the trajectory list with the starting state

        for step in range(max_steps):  # Loop over each step within the episode
            new_state, reward, done, truncated, info = taxi.env.step(ptable[state])  # Take the action and observe the outcome
            trajectory.append(new_state)  # Append the new state to the trajectory
            count[new_state] += 1  # Increment the visit count for the new state

            # Update the value function vtable using the Monte Carlo update formula
            temp = 1
            for currentState in reversed(trajectory[:-1]):  # Traverse the trajectory in reverse order
                vtable[currentState] += temp * reward  # Update the value of the current state
                temp *= discount_rate  # Apply discount rate for the next state
            state = new_state  # Move to the next state

            if done:  # Check if the episode has ended
                break

    vtable /= count  # Average the returns for each state
    return vtable  # Return the updated value table

In [65]:
given_policy = np.zeros(taxi.state_size)
given_policy_state_values = monte_carlo(given_policy, taxi.num_episodes, taxi.max_steps, taxi.discount_rate)

In [66]:
optimal_policy_state_values = monte_carlo(optimal_policy, taxi.num_episodes, taxi.max_steps, taxi.discount_rate)

### 4.3. Testing

In [67]:
print(optimal_policy_state_values - given_policy_state_values)

[ 0.00000000e+00  1.73313422e+02  1.73299865e+02  1.54169858e+02
  4.41402984e+01  0.00000000e+00  1.99648712e-01  8.19902325e+01
  7.76395008e+01  3.08524127e+01  0.00000000e+00  1.51295421e+01
  6.46290604e+01  3.89804651e+01 -3.61172096e+00  0.00000000e+00
  1.99920823e+01 -8.04784579e-01  5.23366385e+00  2.42582353e-01
  0.00000000e+00  1.32556641e+02  1.61636916e+02  1.88265143e+02
  2.97032483e+01  0.00000000e+00  1.75179981e+01  5.67292441e+01
  4.16367641e+01  2.62838837e+01  0.00000000e+00  7.89804651e+01
  6.23833953e+01 -2.62805233e+01 -3.21855818e+00  0.00000000e+00
  1.49911868e+01  0.00000000e+00  0.00000000e+00  0.00000000e+00
  0.00000000e+00 -2.21768000e+01  7.37518622e+01  3.28809534e+01
  1.96996741e+02  0.00000000e+00  1.41929048e+02  1.34132327e+02
  6.30144440e+01 -9.81562795e+00  0.00000000e+00  5.01843720e+01
  3.22268023e+01  1.06571993e+02  7.67292441e+01  0.00000000e+00
  0.00000000e+00  1.09936526e+01  0.00000000e+00  0.00000000e+00
  0.00000000e+00  2.32216