In [51]:
import gym
import numpy as np
import time


env = gym.make("Taxi-v3")
state_n = 500
action_n = 6


class CrossEntropyAgent():
    def __init__(self, state_n, action_n):
        self.state_n = state_n
        self.action_n = action_n
        self.model = np.ones((self.state_n, self.action_n)) / self.action_n

    def get_action(self, state, info):
        action = np.random.choice(np.arange(self.action_n), p=self.model[state])
        return int(action)

    def fit(self, elite_trajectories):
        new_model = np.zeros((self.state_n, self.action_n))
        for trajectory in elite_trajectories:
            for state, action in zip(trajectory['states'], trajectory['actions']):
                new_model[state][action] += 1

        for state in range(self.state_n):
            if np.sum(new_model[state]) > 0:
                new_model[state] /= np.sum(new_model[state])
            else:
                new_model[state] = self.model[state].copy()

        self.model = new_model
        return None

    
def get_trajectory(env, agent, max_len=1000, visualize=False):
    trajectory = {'states': [], 'actions': [], 'rewards': []}

    state, info = env.reset()

    for _ in range(max_len):
        trajectory['states'].append(state)
        
        action = agent.get_action(state, info)
        trajectory['actions'].append(action)
        
        state, reward, terminated, truncated, info = env.step(action)
        trajectory['rewards'].append(reward)
        
        if visualize:
            time.sleep(0.5)
            env.render()
        
        if terminated or truncated:
            break
    
    return trajectory


agent = CrossEntropyAgent(state_n, action_n)
q_param = 0.75
iteration_n = 20
trajectory_n = 1000

for iteration in range(iteration_n):

    #policy evaluation
    trajectories = [get_trajectory(env, agent) for _ in range(trajectory_n)]
    total_rewards = [np.sum(trajectory['rewards']) for trajectory in trajectories]
    print('iteration:', iteration, 'mean total reward:', np.mean(total_rewards))

    #policy improvement
    quantile = np.quantile(total_rewards, q_param)
    elite_trajectories = []
    for trajectory in trajectories:
        total_reward = np.sum(trajectory['rewards'])
        if total_reward > quantile:
            elite_trajectories.append(trajectory)

    agent.fit(elite_trajectories)

trajectory = get_trajectory(env, agent, max_len=100, visualize=False)
print('total reward:', sum(trajectory['rewards']))
print('model:')
print(agent.model)

iteration: 0 mean total reward: -765.514
iteration: 1 mean total reward: -645.681
iteration: 2 mean total reward: -469.385
iteration: 3 mean total reward: -232.975
iteration: 4 mean total reward: -95.853
iteration: 5 mean total reward: -39.17
iteration: 6 mean total reward: -13.408
iteration: 7 mean total reward: 0.209
iteration: 8 mean total reward: 2.984
iteration: 9 mean total reward: 3.949
iteration: 10 mean total reward: 4.478
iteration: 11 mean total reward: 5.324
iteration: 12 mean total reward: 4.315
iteration: 13 mean total reward: 4.986
iteration: 14 mean total reward: 5.029
iteration: 15 mean total reward: 5.888
iteration: 16 mean total reward: 5.033
iteration: 17 mean total reward: 5.514
iteration: 18 mean total reward: 4.632
iteration: 19 mean total reward: 4.809
total reward: 6
model:
[[0.16666667 0.16666667 0.16666667 0.16666667 0.16666667 0.16666667]
 [0.         0.         0.         0.         1.         0.        ]
 [0.         0.         0.         0.         1.    

In [61]:
def eval(env, agent, iteration_n=10):
    total_rewards = []
    for _ in range(iteration_n):
        rewards = get_trajectory(env, agent)['rewards']
        total_rewards.append(np.sum(rewards))
        
    return np.mean(total_rewards), np.std(total_rewards)

In [62]:
eval(env, agent)

(5.6, 5.589275444992848)