In [1]:
import gym
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
import seaborn as sns
import time
sns.set()

In [2]:
lr = 0.01
df = 0.99
epsilon = 0.05
n_episods = 100

env = gym.make('Taxi-v3')
Q = np.zeros((env.observation_space.n, env.action_space.n))

In [4]:
class Taxi3():
    """
    df: discount factor, default: 0.99
    lr: learning rate, default: 0.01
    initial_e: the initial probability of exploration, default: 1
    n_episodes: total number of episodes
    time_steps: the maximum number of time steps at each episodes
    """
    def __init__(self, df= 0.99, lr=0.3, initial_e=1, min_e=0.01, decay_e=0.9, n_episodes=None):
        self.df = df
        self.lr = lr
        self.initial_e = initial_e
        self.epsilon = initial_e
        self.min_e = min_e
        self.decay_e = decay_e
        self.n_episodes = n_episodes
        self.ts = []
        self.env = gym.make('Taxi-v3')
        self.Q = np.zeros((env.observation_space.n, env.action_space.n))
        
        
    def train(self):
        for eps in range(self.n_episodes):
            self.epsilon = self.initial_e
            s = self.env.reset()
            a = self.env.action_space.sample()
#             print('episode:', e)
#             clear_output(wait=True)
            for t in range(self.env.spec.max_episode_steps):
#                 clear_output(wait=True)
#                 self.env.render()
                s_, r, done, __ = self.env.step(a)
                a_ = np.argmax(self.Q[s_, :])
                if self.epsilon > self.min_e:
                    self.epsilon *= self.decay_e
                if np.random.random() < self.epsilon:
                    a_ = self.env.action_space.sample()
                self.Q[s, a] += self.lr * (r + self.df * self.Q[s_, a_] - self.Q[s, a])
                a = a_
                s = s_
#                 self.reward.append(r)
                if done:
                    self.ts.append(t)
                    print(f'Episode {eps} finished after {t} time steps')
                    break
        
    def test(self):
        s = self.env.reset()
        a = self.env.action_space.sample()
        for t_ in range(self.env.spec.max_episode_steps):
            clear_output(wait=True)
            s_, r, done, __ = self.env.step(a)
            self.env.render()
            a_ = np.argmax(self.Q[s_, :])
            a = a_
            s = s_
            time.sleep(0.8)
            if done:
                break
        print(f'Finished in {t_} time steps')
                
    def reset_Q(self):
        self.Q = np.zeros((env.observation_space.n, env.action_space.n))

    def plot_timesteps(self):
        l = []
        s = 0
        for i, p in enumerate(self.ts):
            s += p
            l.append(s/(i+1))
        plt.figure(figsize=(16, 8))
        plt.plot(self.ts, color='#42f5ef')
        plt.plot(l, 'r--', linewidth=3)
        plt.xlabel('Episode')
        plt.ylabel('Time steps')
        plt.xlim(0, len(self.ts))
        plt.ylim(0, 200)
        