In [9]:
import gym
import numpy as np
from pprint import pprint

class PolicyIterationAgent:
    def __init__(self, env):
        self.env = env
        self.observation_dim = env.observation_space.n
        self.actions_variants = np.arange(env.action_space.n)
        self.policy_probs = np.full((self.observation_dim, len(self.actions_variants)), 1 / len(self.actions_variants))
        self.state_values = np.zeros(self.observation_dim)
        self.maxNumberOfIterations = 1000
        self.theta = 1e-6
        self.gamma = 0.9

    def print_policy(self):
        print('Текущая политика:')
        pprint(self.policy_probs)

    def policy_evaluation(self):
        for _ in range(self.maxNumberOfIterations):
            delta = 0
            for state in range(self.observation_dim):
                v = 0
                for action, action_prob in enumerate(self.policy_probs[state]):
                    for prob, next_state, reward, done in self.env.P[state][action]:
                        v += action_prob * prob * (reward + self.gamma * self.state_values[next_state])
                delta = max(delta, abs(self.state_values[state] - v))
                self.state_values[state] = v
            if delta < self.theta:
                break

    def policy_improvement(self):
        policy_stable = True
        for state in range(self.observation_dim):
            old_action = np.argmax(self.policy_probs[state])
            action_values = np.zeros(len(self.actions_variants))
            for action in range(len(self.actions_variants)):
                for prob, next_state, reward, done in self.env.P[state][action]:
                    action_values[action] += prob * (reward + self.gamma * self.state_values[next_state])
            best_action = np.argmax(action_values)
            if old_action != best_action:
                policy_stable = False
            self.policy_probs[state] = np.eye(len(self.actions_variants))[best_action]
        return policy_stable

    def policy_iteration(self):
        iteration = 0
        while True:
            self.policy_evaluation()
            if self.policy_improvement():
                print(f'Политика стабилизировалась после {iteration} итераций.')
                break
            iteration += 1

def play_agent(agent):
    state = agent.env.reset()
    done = False
    total_reward = 0
    while not done:
        action = np.argmax(agent.policy_probs[state])
        state, reward, done, _ = agent.env.step(action)
        total_reward += reward
    return total_reward

def main():
    env = gym.make('Taxi-v3')
    agent = PolicyIterationAgent(env)
    agent.policy_iteration()
    agent.print_policy()
    print('Награда агента:', play_agent(agent))

if __name__ == '__main__':
    main()

Политика стабилизировалась после 12 итераций.
Текущая политика:
array([[0., 0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 1., 0.],
       ...,
       [0., 1., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0.]])
Награда агента: 7


  if not isinstance(terminated, (bool, np.bool8)):
