# Обучение с подкреплением: пример

## Перевернутый маятник: симуляция и бейзлайны

<img src='cart_pole.gif'>

Создадим тестовую среду в Open AI Gym для задачи балансировки перевернутого маятника. Предварительно потребуется выполнить pip install gym

In [2]:
import gym

env = gym.make("CartPole-v1")

In [4]:
env.reset()

array([-0.00530731, -0.04841013, -0.02095877, -0.0463515 ])

In [14]:
env.action_space.sample()

0

In [15]:
env.step(0)

(array([-0.00627551, -0.24322537, -0.0218858 ,  0.23964575]), 1.0, False, {})

*Описание состояния:*
1. позиция тележки – значение в диапазоне [-4.8, 4.8];
2. скорость тележки;
3. угол отклонения шеста от вертикали – значение в диапазоне [-24°, 24°];
4. скорость изменения угла наклона шеста.

*Действие может принимать два значения – 0 и 1:*
0 – толкнуть тележку влево (приложить к тележке горизонтальную силу, равную +1);
1 – толкнуть тележку вправо (приложить к тележке горизонтальную силу, равную -1).
Награда на каждом шаге равна 1, включая и последний шаг.

Начальное состояние задается при помощи датчика равномерно распределенных случайных чисел.

*Условия завершения эпизода:*
1. угол шеста вышел из диапазона [-24°, 24°];
1. позиция тележки вышла из допустимого диапазона [-4.8, 4.8];
1. длина эпизода превышает 500;

Успехом считается только третий вариант

In [29]:
def get_random_action(env, state):
    return env.action_space.sample()

def simulate_games(action_function):
    for episode in range(10):
        state = env.reset()
        
        for t in range(500):
            env.render()
            
            action = action_function(env, state)
            state, reward, done, info = env.step(action)
            
            print(t, state, reward, done, info, action)
            if done:
                break

In [30]:
simulate_games(get_random_action)

0 [-0.0313689   0.24342843  0.0423398  -0.2855433 ] 1.0 False {} 1
1 [-0.02650033  0.04772901  0.03662893  0.02018693] 1.0 False {} 0
2 [-0.02554575  0.24230704  0.03703267 -0.26071784] 1.0 False {} 1
3 [-0.02069961  0.04667655  0.03181832  0.04341182] 1.0 False {} 0
4 [-0.01976608  0.24132813  0.03268655 -0.23906476] 1.0 False {} 1
5 [-0.01493951  0.43596826  0.02790526 -0.52126086] 1.0 False {} 1
6 [-0.00622015  0.63068652  0.01748004 -0.80502146] 1.0 False {} 1
7 [ 0.00639358  0.43532935  0.00137961 -0.5068916 ] 1.0 False {} 0
8 [ 0.01510017  0.24018798 -0.00875822 -0.21377423] 1.0 False {} 0
9 [ 0.01990393  0.04519233 -0.01303371  0.07613314] 1.0 False {} 0
10 [ 0.02080777 -0.14974037 -0.01151104  0.36467556] 1.0 False {} 0
11 [ 0.01781297 -0.34469685 -0.00421753  0.65370674] 1.0 False {} 0
12 [ 0.01091903 -0.14951643  0.0088566   0.35969875] 1.0 False {} 1
13 [ 0.0079287  -0.34476315  0.01605058  0.65516116] 1.0 False {} 0
14 [ 0.00103344 -0.1498683   0.0291538   0.36757521] 1.0 F

5 [-2.24177575e-04  3.60756918e-01  2.77236283e-02 -4.67932306e-01] 1.0 False {} 1
6 [ 0.00699096  0.55547645  0.01836498 -0.75174966] 1.0 False {} 1
7 [ 0.01810049  0.75034039  0.00332999 -1.03859729] 1.0 False {} 1
8 [ 0.0331073   0.94541794 -0.01744196 -1.33023296] 1.0 False {} 1
9 [ 0.05201566  1.14075552 -0.04404662 -1.62832245] 1.0 False {} 1
10 [ 0.07483077  1.3363667  -0.07661307 -1.93440015] 1.0 False {} 1
11 [ 0.1015581   1.14214303 -0.11530107 -1.66642145] 1.0 False {} 0
12 [ 0.12440096  0.94853587 -0.1486295  -1.41176243] 1.0 False {} 0
13 [ 0.14337168  1.14515471 -0.17686475 -1.74697546] 1.0 False {} 1
14 [ 0.16627477  0.95242936 -0.21180425 -1.51412473] 1.0 True {} 0
0 [-0.02635413  0.17009048  0.04691558 -0.28134939] 1.0 False {} 1
1 [-0.02295233  0.36451291  0.0412886  -0.55887402] 1.0 False {} 1
2 [-0.01566207  0.55903173  0.03011112 -0.83826817] 1.0 False {} 1
3 [-0.00448143  0.75372983  0.01334575 -1.12133161] 1.0 False {} 1
4 [ 0.01059316  0.94867424 -0.00908088 -1.

In [24]:
%%writefile cartpole_random.py
import gym
import random

env = gym.make("CartPole-v1")

def get_random_action(env, state):
    return env.action_space.sample()

def get_balancing_action(env, state):
    return int(state[2] > 0)

def get_tricky_balancing_action(env, state):
    next_action = int(state[2] > 0)
    if (abs(state[3]) > 1.0 and state[2] * state [3] < 0):
        next_action = 1 - next_action
    return next_action

def simulate_games(action_function):
    for episode in range(10):
        state = env.reset()
        
        for t in range(500):
            env.render()
            
            action = action_function(env, state)
            state, reward, done, info = env.step(action)
            
            print(t, state, reward, done, info, action)
            if done:
                break
                
simulate_games(get_random_action)

Overwriting cartpole_random.py


### Обучаем решение с помощью Q-learning

В этой секции мы решим задачу с помощью Q-learning, применив для оценки Q-функции двухслойную нейросеть. Строго говоря, в такой конфигурации это немного громко будет называть "Deep Q-learning Network", но основная идея передана верно.

*Примечания*

1. Для того, чтобы пример работал, потребуются tensorflow 2, keras и ffmpeg (для записи видео с симуляцией). Первое и второе ставим через pip install, второе - через upt-get install в Linux, brew install на Mac. Если у вас Windows и установлена Anaconda, можно воспользоваться conda install в Anaconda promt.

2. Ссылка на источник изначальной версии кода: http://www.100byte.ru/python/cartPole/cartPole.html 

In [3]:
%%writefile cartpole_dqnn.py
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import sys
import os
import random

# Агент с Q-обучением
class DQNAgent():
    def __init__(self, observation_space, action_space):
        self.state_size = observation_space
        self.action_size = action_space
        self.memory = deque(maxlen = 20000) # Тип collections.deque
        self.alpha = 1.0 # Скорость обучения агента
        self.gamma = 0.95 # Коэффициент уменьшения вознаграждения агента
        # Уровень обучения повышается с коэффициентом exploration_decay
        # Влияет на выбор действия action (0 или 1)
        self.exploration_rate = 1.0
        self.exploration_min = 0.01
        self.exploration_decay = 0.995
        self.learning_rate = 0.001 # Скорость обучения сети
        self.model = self.build_model()
        print(self.model.summary())

    # Создает модель сети
    def build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim = self.state_size, activation = 'relu'))
        model.add(Dense(24, activation = 'relu'))
        model.add(Dense(2, activation = 'linear'))
        model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
        return model

    # Запоминаем историю
    def remember(self, state, action, reward, state_next, done):
        self.memory.append((state, action, reward, state_next, done))
    #
    # Определяет и возвращает действие
    def findAction(self, state):
        # Случайный выбор действия - 0 или 1
        if np.random.rand() <= self.exploration_rate:
            return random.randrange(self.action_size) # или random.randint(0, 1)
        # Выбор действия по состоянию объекта
        q_values = self.model.predict(state)
        return np.argmax(q_values[0]) # Возвращает действие
    #
    def replay(self, batch_size):
        if len(self.memory) < batch_size: return
        # Обучение агента
        
        # Случайная выборка batch_size элементов для обучения агента
        minibatch = random.sample(self.memory, batch_size)
        
        for state, action, reward, state_next, done in minibatch:
            
            # Пример (done = False):
            # state: [[-0.00626842 0.41118423 -0.07340495 -0.77232979]]
            # q_values (до корректировки): [[0.052909 0.05275263]] - numpy.ndarray
            # state_next: [[ 0.00195526 0.21714493 -0.08885155 -0.50361631]]
            # q_values_next: [[0.03970249 0.02732118]]
            # Qsa = 1.0377173654735088
            # reward = 1.0
            # action = 0
            # q_values (после корректировки): [[1.0377173 0.05275263]]
            # q_values (после обучения НС): [[0.07063997 0.04742151]]
            
            q_values = self.model.predict(state)
            
            if done:
                Qsa = reward
                
            else:
                q_values_next = self.model.predict(state_next)[0]
                # Текущая оценка полезности действия action
                Qsa = q_values[0][action]
                
                # Уточненная оценка полезности действия action
                Qsa = Qsa + self.alpha * (reward + self.gamma * np.amax(q_values_next) - Qsa)
                
            # Формируем цель обучения сети
            q_values[0][action] = Qsa
            
            # Обучение сети
            self.model.fit(state, q_values, epochs = 1, verbose = 0)
            
        if self.exploration_rate > self.exploration_min: 
            self.exploration_rate *= self.exploration_decay
            
def make_video(env, agent):
    env = gym.wrappers.Monitor(env, os.path.join(os.getcwd(), "cartpole_videos"), force=True)
    rewards = 0
    steps = 0
    done = False
    state = env.reset()
    while not done:
        env.render()
        state = np.reshape(state, (1, env.observation_space.shape[0]))
        action = agent.findAction(state)
        state, reward, done, _ = env.step(action)
        steps += 1
        rewards += reward
    print("Testing steps: {} rewards {}: ".format(steps, rewards))

        
if __name__ == "__main__":
    env = gym.make('CartPole-v1') # Создаем среду
    observation_space = env.observation_space.shape[0] # 4
    action_space = env.action_space.n # 2
    
    # DQN - глубокая Q-нейронная сеть
    dqn_agent = DQNAgent(observation_space, action_space) # Создаем агента
    episodes = 1001 # Число игровых эпизодов + 1
    
    # scores - хранит длительность игры в последних 100 эпизодах
    # После достижения maxlen новые значения, добавляемые в scores, будут вытеснять прежние
    scores = deque(maxlen = 100) # Тип collections.deque.
    fail = True
    seed = 2
    np.random.seed(seed)
    random.seed(seed)
    env.seed(seed)
    
    for e in range(episodes):
        # Получаем начальное состояние объекта перед началом каждой игры (каждого эпизода)
        state = env.reset() # Как вариант: state = [0.0364131 -0.02130403 -0.03887796 -0.01044108]
        # state[0] - позиция тележки
        # state[1] - скорость тележки
        # state[2] - угол отклонения шеста от вертикали в радианах
        # state[3] - скорость изменения угла наклона шеста
        state = np.reshape(state, (1, observation_space))
        
        # Начинаем игру
        # frame - текущий кадр (момент) игры
        # Цель - как можно дольше не допустить падения шеста
        frames = 0
        while True:
            frames += 1
            action = dqn_agent.findAction(state) # Определяем очередное действие
            
            # Получаем от среды, в которой выполнено действие action, состояние объекта, награду и значение флага завершения игры
            # В каждый момент игры, пока не наступило одно из условий ее прекращения, награда равна 1
            state_next, reward, done, info = env.step(action)
            state_next = np.reshape(state_next, (1, observation_space))
            
            reward = reward if not done else -reward
            #reward = -100 * (abs(state_next[0, 2]) - abs(state[0, 2]))
            
            # Запоминаем предыдущее состояние объекта, действие, награду за это действие, текущее состояние и значение done
            dqn_agent.remember(state, action, reward, state_next, done)
            state = state_next # Обновляем текущее состояние
            
            # done становится равным True, когда завершается игра, например, отклонение угла превысило допустимое значение
            if done:
                # Печатаем продолжительность игры и покидаем внутренний цикл while
                print("Эпизод: {}/{}, продолжительность игры в кадрах: {}".format(e, episodes - 1, frames))
                break
        scores.append(frames)
        
        if e > 100:
            score_mean = np.mean(scores)
            if score_mean > 1950:
                print('Цель достигнута. Средняя продолжительность игры: ', score_mean)
                fail = False
                break
        # Продолжаем обучать агента
        dqn_agent.replay(24)
    if fail:
        print('Задача не решена ')
        
    make_video(env, dqn_agent)

Writing cartpole_dqnn.py


Также существуют библиотеки с готовыми бейзлайнами для агентов:
https://github.com/openai/baselines - вариант непосредственно от Open AI
https://stable-baselines.readthedocs.io/en/master/ - несколько расширенный форк

Есть сложность, что большинство решений реализовано для tensorflow 1.x. 
У stable baselines есть экспериментальная версия для tf 2: https://github.com/Stable-Baselines-Team/stable-baselines-tf2
А если вы запускаете код в Google Colab, то для использования первой версии tf достаточно применить соответствующий jupyter notebook magic:
%tensorflow_version 1.x
(ссылка: https://colab.research.google.com/notebooks/tensorflow_version.ipynb )


## Опциональное задание
1. Поэкспериментируйте с custom reward в приведенном решении, попробовав ускорить за счет его выбора сходимость
1. Попробуйте существенно ускорить сходимость решения (любые подходы приемлемы)
1. Примените в этой задаче PG и PPO2, сравните сходимость с нашим Q-learning
1. Попробуйте запустить и отрендерить симуляцию в Google Collab
1. Поэкспериментируйте с архитектурой сети и добавлением дополнительных признаков
1. Попытайтесь решить задачу без применения нейросетей - приближая Q-функцию другими моделями или вовсе применив табличные policy/value iteration после дискретизации пространства состояний