In [1]:
# Import all the necessary libraries for the code

import gym
import numpy as np
from time import sleep
from IPython.display import clear_output


In [17]:
# Set and check the enviroment characteristics

env = gym.make('Taxi-v3')
env.render()
print ("Number of possible States", env.nS)
print ("Number of possible Actions", env.nA)


+---------+
|R: | : :[34;1mG[0m|
| : | : :[43m [0m|
| : : : : |
| | : | : |
|[35mY[0m| : |B: |
+---------+

Number of possible States 500
Number of possible Actions 6


In [24]:
"""
EVALUATE A POLICY GIVEN AN ENVIROMENT AND ITS CHARACTERISTICS

Inputs
    - policy: Matrix [S , A]
    - env: Taxi-v3 enviroment
        - env.P : Transition probabilities
        - env.P [s] [a] : List of transition tupples (prob, next_state, reward, done)
        - env.nS : Number of states
        - env.nA : Number of actions
    - gamma : Discount factor
    - theta : Reference that stops evaluation if the change in the value function change is smaller than it
Outputs
    - V : Vector value function with lenght of number of steps

"""

gamma = 1.0
theta = 0.00001

def policy_evaluation(policy , env , gamma , theta):

    V = np.zeros(env.env.nS)  # Value funcion, where all the elements are 0, with lenght of number of steps
    while True:
        delta = 0  # Change in Value function
        for state in range(env.env.nS):  # For every state in all the states
            value = 0  # Set initial value equal to 0
            for action,act_prob in enumerate(policy[state]): # For all actions/action probabilities
                for prob,next_state,reward,done in env.env.P[state][action]:  # Transition probabilities,state,rewards of each action
                    value += act_prob * prob * (reward + gamma * V[next_state])  # Equation
            delta = max(delta, np.abs(value-V[state]))
            V[state] = value
        if delta < theta:  # Reference to stop evaluation process
            break

    return np.array(V)


In [26]:
"""
PREDICT AND CALCULATE VALUE OF EVERY ACTION IN A STATE

Inputs
    - state: State to consider
    - V: Vector value function with lenght of number of steps
Outputs
    - A : Vector value of each action with length of number of actions
    
"""

def prediction(state, V):

    A = np.zeros(env.env.nA)  # Vector value of each action, where all the elements are 0, with lenght of number of actions
    for a in range(env.env.nA):  # For every action in all the actions
        for prob, next_state, reward, done in env.env.P[state][a]:  # Transition probabilities,state,rewards of each action
            A[a] += prob * (reward + gamma * V[next_state])  # Equation
            
    return A


In [29]:
"""
POLICY IMPROVEMENT. EVALUATES AND IMPROVES POLICY UNTIL OPTIMAL

Inputs
    - env : Taxi-v3 enviromen
    - policy_evaluation : Policy evaluation function (policy,env,gamma,theta)
    - gamma : Discount factor
Outputs
    (policy, V)
        - policy : Optimal policy, matrix of shape [S , A]. Each state (S) contains a valid probability distribution over actions (A)
        - V : Value function for the optimal policy.

"""

def policy_improvement(env, policy_evaluation, gamma):
    
    policy = np.ones([env.env.nS, env.env.nA]) / env.env.nA  # Start with a random policy
    while True:
        curr_pol_val = policy_evaluation(policy, env, gamma,theta)  # Evaluate the current policy
        policy_stable = True  # Check if policy improved (Initially True)
        for state in range(env.env.nS):  # For every state in all the states
            chosen_act = np.argmax(policy[state])  # Best action (Highest prob) under current policy
            act_values = prediction(state,curr_pol_val)  # Use prediction to see future action values
            best_act = np.argmax(act_values) # Find best action
            if chosen_act != best_act:
                policy_stable = False  # Look again
            policy[state] = np.eye(env.env.nA)[best_act]  # Update
        if policy_stable:
            return policy, curr_pol_val
    
    return policy, np.zeros(env.env.nS)


In [30]:
gamma=0.99
pol_iter_policy = policy_improvement(env,policy_evaluation,gamma)


In [31]:
def view(policy):
    
    penalties, reward = 0, 0
    frames = []
    done = False
    curr_state = env.reset()
    while not done:
        action = np.argmax(policy[0][curr_state])
        state, reward, done, info = env.step(action)
        curr_state = state
        if reward == -10:
            penalties += 1
        frames.append({'frame': env.render(mode='ansi'),'state': state,'action': action,'reward': reward})
    def animation(frames):
        for i, frame in enumerate(frames):
            clear_output(wait=True)
            print(frame['frame'])
            print(f"Timestep: {i + 1}")
            print(f"State: {frame['state']}")
            print(f"Action: {frame['action']}")
            print(f"Reward: {frame['reward']}")
            sleep(.5)
    animation(frames)


In [32]:
view(pol_iter_policy)


+---------+
|R: | : :G|
| : | : : |
| : : : : |
| | : | : |
|[35m[34;1m[43mY[0m[0m[0m| : |B: |
+---------+
  (Dropoff)

Timestep: 9
State: 410
Action: 5
Reward: 20
