In [None]:
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!pip install colabgymrender
!pip install imageio==2.4.1

In [None]:
%pip install -U gym[atari,accept-rom-license]

In [None]:
import gym
from colabgymrender.recorder import Recorder

In [None]:
env = gym.make("FrozenLake-v1", is_slippery=False)
directory = './video'
env = Recorder(env, directory)

In [None]:
NUM_STATES = env.observation_space.n
NUM_ACTIONS = env.action_space.n

In [None]:
Q_table = np.random.rand(NUM_STATES, NUM_ACTIONS)

0: LEFT

1: DOWN

2: RIGHT

3: UP

In [None]:
Q_table[0, 2] = 1
Q_table[1, 2] = 1
Q_table[2, 1] = 1
Q_table[6, 1] = 1
Q_table[10,1] = 1
Q_table[14,2] = 1

In [None]:
observation = env.reset()
terminal = False

while not terminal:
  action = np.argmax(Q[observation,:])
  observation, reward, done, info = env.step(action)

print(f'Final reward = {reward}')

env.play()
env.close()

In [None]:
env = gym.make("ALE/Pong-v5")
directory = './video'
env = Recorder(env, directory)

In [None]:
def policy(s):
    action = env.action_space.sample() # случайная стратегия
    return action

In [None]:
obs = env.reset()
totalReward = 0

for _ in range(1000):
    action = policy(obs) # случайная стратегия
    obs, reward, done, _ = env.step(action)
    totalReward += reward
    if reward != 0:
        print('New reward = {}'.format(reward))
    if done:        
        break

env.play()      
env.close()

print('Total reward = {}'.format(totalReward))

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
NUM_STATES = env.observation_space.n # количество состояний
NUM_ACTIONS = env.action_space.n # количество действий

lr = 0.3 # learning rate
gamma = 0.6 # параметр дисконтирования

NUM_EPISODES = 1500 # число эпизодов для обучения

In [None]:
pathLenList = [] # длины траекторий по эпизодам
totalRewardList = [] # суммарные награды по эпизодам

Q = np.random.rand(NUM_STATES, NUM_ACTIONS) # Инициализация Q-функции (таблицы)

for i in range(1, NUM_EPISODES+1):
    
    s = env.reset()

    totalReward = 0
    step = 0

    done = False

    while not done:
        step += 1

        a = np.argmax(Q[s,:]) # Выбор действия по текущей политике
        s1, r, done, _ = env.step(a) # Сделать шаг
        
        # Новое (целевое) значение Q-функции
        if done:
            Q_target = r
        else:
            Q_target = r + gamma * np.max(Q[s1,:])             
        Q[s,a] = (1-lr) * Q[s,a] + lr * Q_target # Обновление Q-функции
        
        totalReward += r
        s = s1
            
    pathLenList.append(step)
    totalRewardList.append(totalReward)
    if i % 100 == 0:
      print('Episode {}: Total reward = {}'.format(i, totalReward))     

In [None]:
plt.plot(pathLenList)
plt.grid()

In [None]:
plt.plot(totalRewardList)
plt.grid()

In [None]:
env = gym.make('FrozenLake-v0', is_slippery=False)

NUM_STATES = env.observation_space.n
NUM_ACTIONS = env.action_space.n

In [None]:
lr = 0.8 # learning rate
gamma = 0.95 # параметр дисконтирования

NUM_EPISODES = 50 # число эпизодов для обучения
MAX_STEPS = 100 # максимальное число шагов в эпизоде

In [None]:
pathLenList = [] # длины траекторий по эпизодам
totalRewardList = [] # суммарные награды по эпизодам

# Инициализация Q-функции (таблицы)
Q = np.random.rand(NUM_STATES, NUM_ACTIONS)

for i in range(NUM_EPISODES):
    
    s = env.reset()

    totalReward = 0
    step = 0

    while step < MAX_STEPS:
        step += 1
            
        # Выбор действия по текущей политике
        a = np.argmax(Q[s,:])
        
        # Сделать шаг
        s1, r, done, _ = env.step(a)
        
        # Новое (целевое) значение Q-функции
        if done:
            Q_target = r
        else:
            Q_target = r + gamma * np.max(Q[s1,:])
            
        # Обновление Q-функции
        Q[s,a] = (1-lr) * Q[s,a] + lr * Q_target
        
        totalReward += r
        s = s1
        
        # Если конец эпизода
        if done:
            break
            
    pathLenList.append(step)
    totalRewardList.append(totalReward)
    print('Episode {}: Total reward = {}'.format(i, totalReward))  

In [None]:
s = env.reset()
done = False

while not done:
  a = np.argmax(Q[s,:])
  s, r, done, _ = env.step(a)

print(f'Final reward = {r}')

env.play()
env.close()

In [None]:
import random
import tensorflow as tf
from keras.optimizers import Adam
from keras.layers import Dense
from keras.models import Sequential
from collections import deque

In [None]:
env = gym.make("FrozenLake-v1", is_slippery=False)
directory = './video'
env = Recorder(env, directory)

In [None]:
lr = 0.1 # learning rate
gamma = 0.99 # параметр дисконтирования

NUM_STATES = env.observation_space.n
NUM_ACTIONS = env.action_space.n

NUM_EPISODES = 4000 # число эпизодов для обучения
MAX_STEPS = 300

batch_size=32

In [None]:
class Agent:
    def __init__(self, state_size, action_size):
        self.memory = deque(maxlen=2500)
        self.learning_rate=0.001
        self.epsilon=1
        self.max_eps=1
        self.min_eps=0.01
        self.eps_decay = 0.001/3
        self.gamma=0.9
        self.state_size= state_size
        self.action_size= action_size
        self.epsilon_lst=[]
        self.model = self.buildmodel()

    def buildmodel(self):
        model=Sequential()
        model.add(Dense(10, input_dim=self.state_size, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def add_memory(self, new_state, reward, done, state, action):
        self.memory.append((new_state, reward, done, state, action))

    def action(self, state):
        if np.random.rand() > self.epsilon:
            return np.random.randint(0,4)
        return np.argmax(self.model.predict(state))

    def pred(self, state):
        return np.argmax(self.model(state))

    def replay(self,batch_size):
        minibatch=random.sample(self.memory, batch_size)
        for new_state, reward, done, state, action in minibatch:
            target= reward
            if not done:
                target=reward + self.gamma* np.amax(self.model(new_state))
            target_f= self.model(state)
            target_f[0][action]= target
            self.model.fit(state, target_f, epochs=1, verbose=0)

        if self.epsilon > self.min_eps:
            self.epsilon=(self.max_eps - self.min_eps) * np.exp(-self.eps_decay*episode) + self.min_eps

        self.epsilon_lst.append(self.epsilon)

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

agent=Agent(NUM_STATES, NUM_ACTIONS)

In [None]:
reward_lst=[]
for episode in range(NUM_EPISODES):
    state= env.reset()
    state_arr=np.zeros(NUM_STATES)
    state_arr[state] = 1
    state= np.reshape(state_arr, [1, NUM_STATES])
    reward = 0
    done = False
    t = 0
    for t in range(MAX_STEPS):
        # env.render()
        #t += 1
        action = agent.action(state)
        new_state, reward, done, info = env.step(action)
        new_state_arr = np.zeros(NUM_STATES)
        new_state_arr[new_state] = 1
        new_state = np.reshape(new_state_arr, [1, NUM_STATES])
        agent.add_memory(new_state, reward, done, state, action)
        state= new_state

        if done:  
            break

    reward_lst.append(reward)

    if len(agent.memory)> batch_size:
        print(f'Episode: {episode:4}/{NUM_EPISODES} and step: {t:4}. Eps: {float(agent.epsilon):.2}, reward {reward}')
        agent.replay(batch_size)

print(' Train mean % score= ', round(100*np.mean(reward_lst),1))

In [None]:
agent.save(name='DQN_FrozenLake-v1_4x4')

In [None]:
s = env.reset()
s_arr=np.zeros(NUM_STATES)
s_arr[s] = 1
s= np.reshape(s_arr, [1, NUM_STATES])
done = False

while not done:
  a = agent.pred(s)
  s1, r, done, _ = env.step(a)
  s1_arr = np.zeros(NUM_STATES)
  s1_arr[s1] = 1
  s1 = np.reshape(s1_arr, [1, NUM_STATES])
  s = s1

print(f'Final reward = {r}')

env.play()
env.close()