<a href="https://colab.research.google.com/github/SuperCrabLover/DQN_Cart-Pole/blob/main/DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gym
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [None]:
from numpy.random.mtrand import rand
class Logger():
  
  def __init__(self, size):
    self._len = 0
    self._logs = []
    self._maxlen = size
  
  def add_log(self, log):
    if (self._len + 1 > self._maxlen):
      self._logs = self._logs[int(self._len / 2):]
      self._len = len(self._logs)
    self._logs.append(log)
    self._len += 1
  
  def sample_logs(self, batch_size):
    if (batch_size > self._len):
      raise ValueError
    rand_log_inds = random.sample(range(0, self._len), batch_size)
    temp_np_logs = np.array(self._logs)
    samples = temp_np_logs[rand_log_inds]
    return np.array([i[0] for i in samples]), np.array([i[1] for i in samples]), np.array([i[2] for i in samples]), np.array([i[3] for i in samples]), np.array([i[4] for i in samples])
  
  def is_ready(self, batch_size):
    return self._len >= batch_size

In [None]:
class QModel(nn.Module):
  def __init__(self, state_dim, action_dim, hidden):
    super().__init__()

    self.net = nn.Sequential(
      nn.Linear(state_dim, hidden),
      nn.Tanh(),
      nn.Linear(hidden, hidden),
      nn.Tanh(),
      nn.Linear(hidden, action_dim),
      nn.ReLU()
    )

  def forward(self, x):
    x = x.to(device)
    return self.net(x)

In [None]:
def select_action_eps_greedy(network, state, epsilon):
    if not isinstance(state, torch.Tensor):
        state = torch.tensor(state, dtype=torch.float32)
    Q_s = network(state).detach().numpy()

    if np.random.rand(1)[0] <= epsilon:
      action = np.random.randint(0, len(Q_s))
    else:   
      action = np.argmax(Q_s)
    return int(action)

In [None]:
def train_model(states, actions, rewards, next_states, done, optimizer, gamma=0.99):

  states_t = torch.tensor(states, dtype=torch.float32)           # shape: [batch_size, state_size]
  actions_t = torch.tensor(actions, dtype=torch.long)            # shape: [batch_size]
  rewards_t = torch.tensor(rewards, dtype=torch.float32)         # shape: [batch_size]
  next_states_t = torch.tensor(next_states, dtype=torch.float32) #shape: [batch_size, state_size]
  done_t = torch.tensor(done, dtype=torch.bool)               # shape: [batch_size]

  predicted_qvalues = policy_model(states_t)
  predicted_qvalues_for_actions = predicted_qvalues[range(states_t.shape[0]), actions_t]
  predicted_next_qvalues = target_model(next_states_t)

  next_state_values = torch.max(predicted_next_qvalues, 1)[0]
  target_qvalues_for_actions = gamma * next_state_values + rewards_t
  target_qvalues_for_actions = torch.where(done_t, rewards_t, target_qvalues_for_actions)

  loss = torch.mean((predicted_qvalues_for_actions - target_qvalues_for_actions.detach()) ** 2)
  # добавляем регуляризацию на значения Q 
  loss += 0.1 * predicted_qvalues_for_actions.mean()

  optimizer.zero_grad()
  loss.backward()
  optimizer.step()

In [None]:
def generate_session(env, opt, logger, batch_size, policy_model, target_model, target_update, t_max=1000, epsilon=0, train=False):
    total_reward = 0
    s = env.reset()
    for t in range(t_max):
        a = select_action_eps_greedy(policy_model, s, epsilon=epsilon)
        next_s, r, done, _ = env.step(a)
        logger.add_log(np.array([s, a, r, next_s, done]))
        if train:
            if logger.is_ready(batch_size):
              train_model(np.array([s]), np.array([a]), np.array([r]), np.array([next_s]), np.array([done]), opt, gamma=0.99)
              states, actions, rewards, next_ss, dones = logger.sample_logs(batch_size)
              train_model(states, actions, rewards, next_ss, dones, opt, gamma=0.99)
            else:
              train_model(np.array([s]), np.array([a]), np.array([r]), np.array([next_s]), np.array([done]), opt, gamma=0.99)
            if t % target_update == 0:
              target_model.load_state_dict(policy_model.state_dict())

        total_reward += r
        s = next_s
        if done:
            break

    return total_reward

In [None]:
env = gym.make('CartPole-v1').unwrapped

n_actions = env.action_space.n
state_dim = env.observation_space.shape

HIDDEN = 64
policy_model = QModel(state_dim[0], n_actions, HIDDEN).to(device)
target_model = QModel(state_dim[0], n_actions, HIDDEN).to(device)
target_model.load_state_dict(policy_model.state_dict())
target_model.eval()

QModel(
  (net): Sequential(
    (0): Linear(in_features=2, out_features=64, bias=True)
    (1): Tanh()
    (2): Linear(in_features=64, out_features=64, bias=True)
    (3): Tanh()
    (4): Linear(in_features=64, out_features=3, bias=True)
    (5): ReLU()
  )
)

In [None]:
BATCH_SIZE = 128
GAMMA = 0.99
TARGET_UPDATE = 10
T_MAX = 500 #5000
EPSILON = 0.7
EPSILON_DECAY = 0.9
LOGGER_SIZE = 1024

logger = Logger(LOGGER_SIZE)
opt = torch.optim.Adam(policy_model.parameters(), lr=1e-4)

In [None]:
for i in range(150):
    session_rewards = [generate_session(env, opt, logger, BATCH_SIZE, policy_model, target_model, TARGET_UPDATE, t_max = T_MAX, epsilon=EPSILON, train=True) for _ in range(100)]
    print("Epoch: #{}\tmean reward = {:.3f}\tepsilon = {:.3f}".format(i, np.mean(session_rewards), EPSILON))

    EPSILON *= EPSILON_DECAY
    if EPSILON <= 1e-4:
      EPSILON = 0.5

    if np.mean(session_rewards) >= 250.:
        print("Принято!")
        break

NameError: ignored

In [None]:
try:
    import colab
    COLAB = True
except ModuleNotFoundError:
    COLAB = False
    pass

if COLAB:
    !wget https://gist.githubusercontent.com/Tviskaron/4d35eabce2e057dd2ea49a00b00aaa41/raw/f1e25fc6ac6d8f11cb585559ce8b2ab9ffefd67b/colab_render.sh -O colab_render.sh -q
    !sh colab_render.sh
    !wget https://gist.githubusercontent.com/Tviskaron/d91decc1ca5f1b09af2f9f080011a925/raw/0d3474f65b4aea533996ee00edf99a37e4da5561/colab_render.py -O colab_render.py -q 
    import colab_render

In [None]:
# библиотеки и функции, которые потребуются для показа видео

import glob
import io
import base64
from IPython import display as ipythondisplay
from IPython.display import HTML
from gym.envs.classic_control import rendering
import matplotlib.pyplot as plt

%matplotlib inline


org_constructor = rendering.Viewer.__init__


def constructor(self, *args, **kwargs):
    org_constructor(self, *args, **kwargs)
    self.window.set_visible(visible=False)


rendering.Viewer.__init__ = constructor


def show_video(folder="./video"):
    mp4list = glob.glob(folder + '/*.mp4')
    if len(mp4list) > 0:
        mp4 = sorted(mp4list, key=lambda x: x[-15:], reverse=True)[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

In [None]:
env = gym.make("CartPole-v1")
env = gym.wrappers.Monitor(env, "./video", force=True)

generate_session(env, opt, logger, BATCH_SIZE, policy_model, target_model, TARGET_UPDATE, epsilon=EPSILON, train=False)

env.close()
show_video()

RecursionError: ignored