In [None]:
import gym
import numpy as np
import pandas as pd 
from random import random
import time
import csv


def open_dataset(file_name):
    return pd.read_csv(filepath_or_buffer=file_name, delimiter=",", encoding="utf-8", header=0)


env = gym.make("Taxi-v3")
env.reset()
env.render()

actions = env.action_space.n
print(f"Total possible actions: {actions}")

states = env.observation_space.n
print(f"Total states: {states}")


In [None]:
# Create Q table of rewards defined to 0, for each actions on each step
q_table = np.zeros((states, actions))
print(q_table)

In [None]:
def play_to_taxi(environment, q_table, epsilon=1.0, epsilon_min=0.01, epsilon_max=1.0, learning_rate=0.02, reward_discount_rate=0.618, decay_rate = 0.01, episodes=100000, max_steps=99, training=True, reset=False, show_print=True):
    t = time.process_time()
    training_time = 0
    won_episode = 0
    total_steps = []
    total_rewards = []

    if reset == True:
        q_table = np.zeros((states, actions))

    done = False
    
    if show_print:
        if training:
            print('')
            print('---------------------- TRAINING ----------------------')
        else:
            print('')
            print('---------------------- TESTING ----------------------')

    for episode in range(episodes):
        environment.reset()
        state = 0
        total_episode_rewards = 0
        total_episode_steps = 0
        done = False
        
        for step in range(max_steps):
            # epsilon-greedy
            if training == True and random() < epsilon: # Exploration
                action = environment.action_space.sample() # get random action
                
            else: # Exploitation
                possibilities = q_table[state,:] # possibilities from current state
                action = np.argmax(possibilities) # get the best direction depending on the reward value
            
            # Move to direction
            next_state, reward, done, info = environment.step(action)

            if training == True:
                # Update Q table with value function
                # V(s) = V(s) + (lr x (V(s') - V(s)))
                # state_value = state_value + alpha x (reward + gamma x next_state_value - state_value)
                q_table[state, action] = q_table[state, action] + learning_rate * (reward + reward_discount_rate * np.max(q_table[next_state, :]) - q_table[state, action])


            state = next_state
            total_episode_steps = step + 1
            
            # Update statistics
            total_episode_rewards += reward


            if done:
                total_rewards.append(total_episode_rewards)
                won_episode += 1
                # print(f"Score: {total_episode_rewards}")
                break


        # game is ended  
        total_steps.append(total_episode_steps)

         # epsilon decay to maintain trade-off between exploration-exploitation
        epsilon = epsilon_min + (epsilon_max - epsilon_min) * np.exp(-decay_rate * episode)
                
    
    # print(f'Total rewards : {total_rewards}')
    # print(f'Total steps : {total_steps}')
    
    avg_rewards = round(sum(total_rewards) / len(total_rewards), 2)
    avg_steps = round(sum(total_steps) / len(total_steps), 2)
    won_rate = round(won_episode / episodes * 100, 2)
    training_time = round(time.process_time() - t, 2)

    if show_print:
        print(f'Total episodes : {episodes}')
        print(f'Average rewards : {avg_rewards}')
        print(f'Average steps : {avg_steps}')
        print(f'Won episode : {won_episode} ({won_rate}%)')

        if training == True:
            print(f'Training time : {training_time} seconds')

    return {
        'q_table': q_table,
        'avg_rewards': avg_rewards,
        'avg_steps': avg_steps,
    }


In [None]:
epsilon_rate = 1.0              # Exploration vs exploitation
learning_rate = 0.7             # Learning rate
epsilon_max = 1.0               # Exploration probability at the start
epsilon_min = 0.01              # Minimum exploration probability
reward_discount_rate = 0.618    # Discounting rate for rewards
decay_rate = 0.01 

nb_episodes_list = [i for i in range(1, 50001) if (i)/10000 % 1 == 0]
learning_rate_list = [0.5, 0.6, 0.7, 0.8, 0.9]
reward_discount_rate_list = [0.9,  0.786, 0.618, 0.5, 0.382, 0.2, 0.1]
decay_rate_list = [0.01, 0.03, 0.05, 0.07, 0.1]

best_avg_rewards = 0
best_nb_episodes = 0
best_learning_rate = 0
best_reward_discount_rate = 0
best_decay_rate = 0

for nb_episodes in nb_episodes_list:
    for learning_rate in learning_rate_list:
        for reward_discount_rate in reward_discount_rate_list:
            for decay_rate in decay_rate_list:
                training_result = play_to_taxi(environment=env, 
                        q_table=q_table, 
                        epsilon=epsilon_rate, 
                        epsilon_min=epsilon_min,
                        epsilon_max=epsilon_max,
                        learning_rate=learning_rate, 
                        episodes=nb_episodes,
                        reward_discount_rate=reward_discount_rate, 
                        decay_rate=decay_rate,
                        training=True,
                        show_print=False,
                        reset=True
                        )

                test_result = play_to_taxi(environment=env, 
                        q_table=training_result['q_table'], 
                        epsilon=epsilon_rate, 
                        epsilon_min=epsilon_min,
                        epsilon_max=epsilon_max,
                        learning_rate=learning_rate, 
                        episodes=nb_episodes,
                        reward_discount_rate=reward_discount_rate, 
                        decay_rate=decay_rate,
                        training=False,
                        show_print=False,
                        reset=False
                        )

                with open('q-learning.csv', 'a', encoding='UTF8', newline='') as f:
                    writer = csv.writer(f)
                    writer.writerow([nb_episodes, epsilon_rate, epsilon_min, epsilon_max, learning_rate, reward_discount_rate, decay_rate, test_result['avg_rewards'], test_result['avg_steps']])
                

In [None]:
qlearning_data = open_dataset('q-learning.csv')
qlearning_data = qlearning_data.sort_values(
    by=['avg_rewards'], ascending=[False]).reset_index(drop=True)
qlearning_data.head(5)


In [None]:
nb_episodes = 10000             # Number of games to be played
epsilon_rate = 1.0              # Exploration vs exploitation
epsilon_min = 0.01              # Minimum exploration probability
epsilon_max = 1.0               # Exploration probability at the start
learning_rate = 0.8             # Learning rate
reward_discount_rate = 0.786    # Discounting rate for rewards
decay_rate = 0.07

result_training = play_to_taxi(environment=env, 
             q_table=q_table, 
             epsilon=epsilon_rate, 
             epsilon_min=epsilon_min,
             epsilon_max=epsilon_max,
             learning_rate=learning_rate, 
             episodes=nb_episodes,
             reward_discount_rate=reward_discount_rate, 
             decay_rate=decay_rate,
             training=True,
             reset=True)

result_testing = play_to_taxi(environment=env, 
             q_table=result_training['q_table'], 
             epsilon=epsilon_rate, 
             epsilon_min=epsilon_min,
             epsilon_max=epsilon_max,
             learning_rate=learning_rate, 
             episodes=nb_episodes,
             reward_discount_rate=reward_discount_rate, 
             decay_rate=decay_rate,
             training=False)

