# Реализация DQN

In [1]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline
import gym
import tensorflow as tf

# Создание игровой среды

Создадим симулятор Frozen Lake.

In [2]:
env = gym.make('FrozenLake-v1', is_slippery=False)

NUM_STATES = env.observation_space.n
NUM_ACTIONS = env.action_space.n

print('States: {}'.format(NUM_STATES))
print('Actions: {}'.format(NUM_ACTIONS))

States: 16
Actions: 4


# Параметры обучения

* lr - скорость обучения (в теории это был параметр alpha)
* gamma - параметр дисконтирования
* NUM_EPISODES - сколько всего эпизодов игры будем осуществлять
* MAX_STEPS - максимальное количество шагов в рамках одного эпизода. Эпизод может закончится раньше. Это ограничение
нужно, если агент зашел в какой-то тупик и там застрял. Или для бесконечных игр/симуляций.

Кроме того, будем смотреть на награду, усредненную в некотором временнОм окне (окно по эпизодам).
* REWARD_AWERAGE_WINDOW - размер этого окна.

In [3]:
lr = .1 # learning rate
gamma = .99 # параметр дисконтирования

NUM_EPISODES = 1000 # число эпизодов для обучения
MAX_STEPS = 100 # максимальное число шагов в эпизоде

REWARD_AVERAGE_WINDOW = 20 # окно для усреднения наград по эпизодам

# Создание модели
Создадим простую модель для аппроксимации оптимальной Q-функции. Формально это будет нейронная сеть на Tensorflow.
Однако по факту, мы будем использовать лишь один Embedding слой. Это обучаемый слой, который по некоторому целочисленному
входу выдает некоторый вектор. Именно это нам и нужно для отображения состояния s в вектор Q(s, :) для различных действий.
Интересным фактом является то, что так как Embedding слой по сути матрица а прямое распостранение для него -- это выбор
s-той строчки в этой матрице, такая модель эквивалентна обычной Q-таблице. Такое сработает лишь потому, что у нас очень простая
задача (Frozen Lake). Для более сложных задач надо использовать более сложные нейросети.
Кроме самой модели создадим функцию для инференса evalQ, которая будет вычислять вектор Q(s, :) по входному состоянию s

In [4]:
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(NUM_STATES, NUM_ACTIONS, tf.initializers.RandomUniform(0, 1)),
    tf.keras.layers.Dense(4, activation='sigmoid')
])

def evalQ(s):
    inp = np.array([[s]], dtype=np.int32)
    return model(inp).numpy()[0][0]

Metal device set to: Apple M2


# Подготовка к обучению
В качестве функции потерь будем использовать квадрат нормы разницы между целевым и предсказанным вектором Q(s, :), как это обычно
делается в задачах регрессии (ведь нам нужно притянуть друг к другу эти два вектора). Зададим это функцией loss()
В качестве оптимизатора для нейросети будем использовать SGD.

In [5]:
def loss(q1, q2):
    return tf.reduce_sum(tf.square(q1 - q2))

optimizer = tf.keras.optimizers.legacy.SGD(learning_rate=lr)
model.compile(loss=loss, optimizer=optimizer)

# Обучение DQN
Обучение DQN во многом похоже на табличный Q-Learning. Рассмотрим подробнее лишь те части, которвые отличаются.
Чтобы получить значения Q-функции для текущего состояния s и всех действий, поспользуемся фунукцией evalQ:
    Q_s = evalQ(s)
    
 Во время обучения будем использовать eps-greedy подход для выбора действия (в зависимости от параметра eps выбирать
 случайное действие по текущей политике).
 Самое интересное - как сделать обновление Q-функции. Для начала, надо разобраться, как получить целевой вектор Q-target.
 Вспомним, что в Q-Learning мы должны обновить толькл значение Q(s, a) (через уравление Беллмана) Но так как нейросеть
 предсказывает вектор для всех возможных действий (Q(s, :)), сделать нужно следующим образом.
 
 Пусть целевой вектор для Q(s, :) будет равен исходным (предсказанным нейросетью) значениям для всех действий, кроме действия a.
 То есть скопируем в Q_target текущий предсказанный Q_s и заменим в нем лишь Q_target[a] на то, что дает нам Беллман. И
 Теперь наша задача "обучить" нейросеть на этот целевой вектор, то есть заставить её изменить свои веса так, чтобы толок значение
 для действия a изменилось, а остальные значения Q(s, :) по возможности не изменялись. Делаем это через минимизацию ошибки между
 этими векторами.
 А чтобы сделать один шаг минимизации ошибки,  нам надо сделать один шаг градиентного спуска (или в общем случае один шаг
 обучения модели). Сделать это можно с помощью model.train_on_batch(...), которая делает шаг обучения на данном
 батче. На входе у нее батч входов и батч правильных ответов. У нас будет батч из одного элемента.
 В остальном все тоже самое, как было в Q-learning.
     И еще дополнительно накапливаем список усредненных наград -- totalRewardAverageList

In [None]:
pathLenList = [] # длины траекторий по эпизодам
totalRewardList = [] # суммарные награды по эпизодам
totalRewardAverageList = [] # суммарные награды по эпизодам (среднее по окну)

for i in range(NUM_EPISODES):

    eps = 1.0 - i / NUM_EPISODES

    s = env.reset()[0] #[1]['prob']

    totalReward = 0
    step = 0

    while step < MAX_STEPS:
        step += 1

        Q_s = evalQ(s)
        
        if np.random.rand() < eps:
            # Выбор случайного действия
            a = env.action_space.sample()
        else:
            # Выбор действия по текущей политике
            a = np.argmax(Q_s)
        
        # Сделать шаг
        s1, r, _, done = env.step(a)[:4]

        Q_s1 = evalQ(s1)
        
        # Новое (целевое) значение Q-функции
        Q_target = Q_s
        if done:
            Q_target[a] = r
        else:
            Q_target[a] = r + gamma * np.max(Q_s1)
              
        # Обновление Q-функции
        inp = np.array([[s]], dtype=np.int32)
        model.train_on_batch(inp, Q_target[None, None, ...])
        
        totalReward += r
        s = s1
        
        # Если конец эпизода
        if done:
            break
    pathLenList.append(step)
    totalRewardList.append(totalReward)
    
    if i % REWARD_AVERAGE_WINDOW == 0 and i >= REWARD_AVERAGE_WINDOW:
        totalRewardAverage = np.mean(totalRewardList[-REWARD_AVERAGE_WINDOW:])
        totalRewardAverageList.append(totalRewardAverage)
        if i % 100 == 0:
            print('Episode {}: average total reward = {}'.format(i, totalRewardAverage))

2023-07-23 12:18:56.493911: W tensorflow/tsl/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


In [None]:
plt.plot(pathLenList)
plt.grid()

In [None]:
plt.plot(totalRewardAverageList)
plt.grid()

# Запуск симуляции
Запустим симуляцию для Frozen Lake так же, как мы это делали до этого.
В качестве политики будем использовать нашу обученную DQN:
np.argmax(evalQ(s))    

In [None]:
totalReward = 0
s = env.reset()[0]

for _ in range(100):
    env.render()
    a = np.argmax(evalQ(s)) # выбираем оптимальное действие
    s, r, _, done = env.step(a)[:4]
    totalReward += r
    if done:
        env.render()
        break
        
env.close()
print('Total reward = {}'.format(totalReward))