In [None]:
#!pip install gymnasium

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
import time
import random
from IPython.display import clear_output
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Gymnasium - library exploration

### Taxi-V3
###Действия:

- 0: двигаться вниз
- 1: двигаться вверх
- 2: двигаться вправо
- 3: двигаться влево
- 4: подобрать пассажира
- 5: высадить пассажира
---

###Состояния:
- 500 дискретных состояний: 25 позиций такси, 5 возможных локаций пассажира, включая пассажира в такси, и 4 локации назначения
- за эпизод может быть достигнуто 400 состояний
---

###Локации пассажира:
- 0: RED (R)
- 1: GREEN (G)
- 2: YELLOW (Y)
- 3: BLUE (B)
- 4: в такси
---

###Точки назначения:
- 0: RED (R)
- 1: GREEN (G)
- 2: YELLOW (Y)
- 3: BLUE (B)

---
###Награды:

- -1 за каждое действие не дающее другую награду
- +20 за правильную высадку пассажира
- -10 за неправильную посадку/высадку пассажира
---



In [2]:
env = gym.make("Taxi-v3", render_mode="ansi")
observation, info = env.reset(seed=SEED)
# - такси (желтое) - стартует в случайном месте
# - пассажир - стартует в одной из 4 локаций высадки (R, G, B, Y)
# - пассажир - буква выделена жирным, синим цветом
# - когда пассажир подобран такси становится зеленым
# - нужно доставить пассажира в указанную локацию, выделена фиолетовым
print(env.render())

+---------+
|R: | : :[34;1mG[0m|
| : | : : |
| : : : : |
| | : | :[43m [0m|
|[35mY[0m| : |B: |
+---------+




In [3]:
# номер текущего состояния
observation

386

In [4]:
# Размер пространства действий
env.action_space

Discrete(6)

In [5]:
# Количество состояний среды
env.observation_space

Discrete(500)

In [127]:
# P - многомерный список, хранящий информацию обо всех состояниях и обо всех возмножных действиях
# Нпример, возможные действия в состоянии 386:
# Для каждого действия определены:
#  -верятность перехода в указанное следующее состояние при выполнении действия
#  -следующее состояние
#  -награда за действие
#  -находимся ли в терминальном состоянии
env.P[386]

{0: [(1.0, 486, -1, False)],
 1: [(1.0, 286, -1, False)],
 2: [(1.0, 386, -1, False)],
 3: [(1.0, 366, -1, False)],
 4: [(1.0, 386, -10, False)],
 5: [(1.0, 386, -10, False)]}

In [6]:
# Двигаем такси на запад
observation, reward, terminated, truncated, info = env.step(3)
print(env.render())
print(f"Перешли в состояние: {observation}")
print(f"Получили награду за действие: {reward}")

+---------+
|R: | : :[34;1mG[0m|
| : | : : |
| : : : : |
| | : |[43m [0m: |
|[35mY[0m| : |B: |
+---------+
  (West)

Перешли в состояние: 366
Получили награду за действие: -1


In [113]:
# action_mask - показывает список возможных действий
# из состояния 366 - это вверх, вниз, вправо
info

{'prob': 1.0, 'action_mask': array([1, 1, 1, 0, 0, 0], dtype=int8)}

In [7]:
# Визуализация одного эпизода
actions = [1, 1, 1, 4, 3, 3, 0, 0, 3, 3, 0, 0, 5]
reward_sum = 0
env.reset(seed=SEED)
print(env.render())

for action_n in actions:
  time.sleep(0.9)
  clear_output(True)
  observation, reward, terminated, truncated, info = env.step(action_n)
  reward_sum +=reward
  print(env.render())
  print(f"Current state: {observation}")
  print(f"Truncated: {truncated}")
  print(f"Terminated: {terminated}")

print(f"Total rewards: {reward_sum}")

+---------+
|R: | : :G|
| : | : : |
| : : : : |
| | : | : |
|[35m[34;1m[43mY[0m[0m[0m| : |B: |
+---------+
  (Dropoff)

Current state: 410
Truncated: False
Terminated: True
Total rewards: 8


In [8]:
# При переходе из состояния в 418 в состояние 410 мы перейдем в терминальное состояние и получим награду за высадку пассажира
env.P[418]

{0: [(1.0, 418, -1, False)],
 1: [(1.0, 318, -1, False)],
 2: [(1.0, 418, -1, False)],
 3: [(1.0, 418, -1, False)],
 4: [(1.0, 418, -10, False)],
 5: [(1.0, 410, 20, True)]}

# Value Iteration

Изучим и реализуем алгоритм value iteration:

In [9]:
def value_iteration(environment, gamma=1.0, theta=0.001, max_iter=100):
  # V - хранит ценности состояний (state-value func)
  V = np.zeros(environment.observation_space.n)

  delta = 0.

  for k in range(max_iter):
    for state_n in range(environment.observation_space.n):

      # Нам нужно посчитать награду за каждое действие в текущем состоянии
      action_values = np.zeros(environment.action_space.n)
      for action_n in range(environment.action_space.n):
        # reward - немедленная награда за переход в новое состояние при действии action_n
        # probability - вероятность перейти в следующее состояние при выполнении действия action_n
        for probability, next_state, reward, terminated in environment.P[state_n][action_n]:
        # Выполняем подсчет награды за действие action_n при переходе в следующее состояние next_state
          action_values[action_n] += probability * (reward + gamma* V[next_state])

      # Q_value - награда за действие a в состоянии s при следовании оптимальной политике pi
      Q_value = np.max(action_values)

      delta = max(delta, np.abs(V[state_n] - Q_value))

      V[state_n] = Q_value

    if delta < theta:
        print(f"Вычисление ценности для каждого состояния завершено на итерации: {k} ")
        break

  # policy - хранит лучшее действие для каждого состояния
  policy = np.zeros([environment.observation_space.n, environment.action_space.n])
  for state_n in range(environment.observation_space.n ):

      # Также считаем ценность действий
      action_values = np.zeros(environment.action_space.n)
      for action_n in range(environment.action_space.n):
        for probability, next_state, reward, terminated in environment.P[state_n][action_n]:
          action_values[action_n] += probability * (reward + gamma* V[next_state])

      # Сохраняем индекс лучшего действия для текущего состояния
      best_policy = np.argmax(action_values)
      policy[state_n, best_policy] = 1

  return policy, V

Обучим алгоритм Value Iteration:

In [10]:
environment = gym.make("Taxi-v3")
policy, V = value_iteration(environment, max_iter=1000)

Визуализируем случайную игру после обучения алгоритма Value Iteration:

In [11]:
def play_with_policy(env, policy):
  current_state, info = env.reset()
  print(env.render())

  terminated = False

  reward_sum = 0
  while not terminated:
    next_action = np.argmax(policy[current_state])
    time.sleep(0.9)
    clear_output(True)
    current_state, reward, terminated, truncated, info = env.step(int(next_action))
    reward_sum +=reward
    print(env.render())
    print(f"Current state: {current_state}")
    print(f"Truncated: {truncated}")
    print(f"Terminated: {terminated}")

  print(f"Total rewards: {reward_sum}")

  # P.S. Если сделать while True, то на финальной точке алгоритм будет подбирать и высаживать пассажира бесконечно:)

In [13]:
play_with_policy(env, policy)

+---------+
|R: | : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |[35m[34;1m[43mB[0m[0m[0m: |
+---------+
  (Dropoff)

Current state: 475
Truncated: False
Terminated: True
Total rewards: 11


Посчитаем среднюю награду и число побед за 10000 случайных эпизодов:

In [14]:
def evaluate_algorithm(env, policy, episodes=10_000):
  rewards = []
  wins = 0
  for episode in range(episodes):
    current_state, info = env.reset()
    terminated = False
    reward_sum = 0
    while not terminated:
      next_action = np.argmax(policy[current_state])
      current_state, reward, terminated, truncated, info = env.step(int(next_action))
      reward_sum +=reward

      if terminated and reward == 20:
        wins+=1

    rewards.append(reward_sum)

  print(f"Количество побед: {wins}")
  print(f"Средняя награда: {np.mean(rewards)}")

In [15]:
evaluate_algorithm(env, policy)

Количество побед: 10000
Средняя награда: 7.9066


# Policy Iteration

Реализуем алгоритм Policy Iteration

In [16]:
def policy_iteration(environment, gamma=1.0, theta=0.001, max_iter=1000):

  n_states = environment.observation_space.n
  n_actions = environment.action_space.n

  policy = np.ones([n_states, n_actions]) / n_actions

  V = np.zeros(environment.observation_space.n)

  # Policy Evaluation
  for k in range(max_iter):
    delta = 0
    for state_n in range((n_states)):
      v = 0

      # Подсчет state-value function (V) для политики policy
      for action_n in range((n_actions)):

        # policy_s_a - вероятность действия action_n в состоянии state_n под политикой policy
        policy_s_a = policy[state_n][action_n]
        for probability, next_state, reward, terminated in environment.P[state_n][action_n]:
          v  += policy_s_a*probability*(reward + gamma*V[next_state])

      delta = max(delta, np.abs(V[state_n] - v))
      V[state_n] = v

    if delta < theta:
      print(f"Policy evaluation завершено на итерации: {k + 1} ")
      break

    # Policy improvement
    policy_stable = True
    for state_n in range((n_states)):

      current_best_action = np.argmax(policy[state_n])

      # Считаем ценность состояния
      action_values = np.zeros(n_actions)
      for action_n in range(n_actions):
        for probability, next_state, reward, terminated in environment.P[state_n][action_n]:
          action_values[action_n] += probability * (reward + gamma* V[next_state])

      best_action = np.argmax(action_values)

      if current_best_action != best_action:
        policy_stable = False
        policy[state_n] = np.eye(n_actions)[best_action]

    if policy_stable:
      print(f"Лучшая политика была найдена на итерации {k + 1}")
      return policy, V

  print(f"Выполнены все итерации, нет гарантии, что политика стабильна")
  return policy, V

Обучим алгоритм Policy Iteration:

In [17]:
environment = gym.make("Taxi-v3")
policy, V = policy_iteration(environment)

Лучшая политика была найдена на итерации 18


Визуализируем случайную игру после обучения алгоритма Policy Iteration:

In [18]:
play_with_policy(env, policy)

+---------+
|[35m[34;1m[43mR[0m[0m[0m: | : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |B: |
+---------+
  (Dropoff)

Current state: 0
Truncated: False
Terminated: True
Total rewards: 3


Посчитаем среднюю награду и число побед за 10000 случайных эпизодов:

In [19]:
evaluate_algorithm(env, policy)

Количество побед: 10000
Средняя награда: 7.5917
