初回インストール物

In [1]:
!pip install torch==1.5.1+cpu torchvision==0.6.1+cpu -f https://download.pytorch.org/whl/torch_stable.html

Looking in links: https://download.pytorch.org/whl/torch_stable.html
Note: you may need to restart the kernel to use updated packages.


コード

In [1]:
%matplotlib inline
from IPython import display
import matplotlib.pyplot as plt

import gym
import numpy as np
import time

display_notebook = False # サーバ上でやる場合はTrueにすること（フレームレート下がるので不要ならFalse）

強化学習

ネットワーク

In [2]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.optim import Adam

In [3]:
def weights_init_xavier(m):
    if isinstance(m, nn.Linear)\
            or isinstance(m, nn.Conv2d)\
            or isinstance(m, nn.ConvTranspose2d):
        torch.nn.init.xavier_uniform_(m.weight, gain=1)
        if m.bias is not None:
            torch.nn.init.constant_(m.bias, 0)


def create_linear_network(input_dim, output_dim, hidden_units=[256, 256],
                          hidden_activation=nn.ReLU(), output_activation=None,
                          initializer=weights_init_xavier):
    model = []
    units = input_dim
    for next_units in hidden_units:
        model.append(nn.Linear(units, next_units, bias=False))
        model.append(hidden_activation)
        units = next_units

    model.append(nn.Linear(units, output_dim))
    if output_activation is not None:
        model.append(output_activation)

    return nn.Sequential(*model).apply(initializer)

In [4]:
class QNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_units=[256, 256], initializer=weights_init_xavier):
        super(QNetwork, self).__init__()
        self.num_actions = num_actions
        self.Q = create_linear_network(
            num_inputs, num_actions, hidden_units=hidden_units,
            initializer=initializer)
        

    def forward(self, states):
        states = torch.Tensor(states)
        q = self.Q(states)
        
        return q

class Agent:
    def __init__(self, epsilon_start=0.3, epsilon_end=0.1, learning_rate = 0.001, hidden_units=[256, 256], tau=0.2):
        env = gym.make(env_name)
        num_inputs = env.observation_space.shape[0]
        self.num_actions = env.action_space.n
        self.model = QNetwork(num_inputs, self.num_actions, hidden_units=hidden_units)
        self.learner_model = QNetwork(num_inputs, self.num_actions, hidden_units=hidden_units)
        hard_update(self.model, self.learner_model)
        self.optim = Adam(self.learner_model.parameters(), lr=learning_rate)
        
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.a = (epsilon_end - epsilon_start)
        self.b = epsilon_start
        
    def get_action_greedy(self, observation):
        q = self.model(observation).detach().numpy()
        action = np.argmax(q)
        
        return action

    def get_action(self, observation, step):
        epsilon = self.a*step + self.b
        if np.random.rand() < epsilon:
            action = np.random.randint(self.num_actions)
        else:
            action = self.get_action_greedy(observation)
        return action

学習

In [5]:
def soft_update(target, source, tau):
    for t, s in zip(target.parameters(), source.parameters()):
        t.data.copy_(t.data * (1.0 - tau) + s.data * tau)
        
def hard_update(target, source):
    target.load_state_dict(source.state_dict())
        
def update_params(optim, network, loss, grad_clip=40, retain_graph=False):
    optim.zero_grad()
    loss.backward(retain_graph=retain_graph)
    if grad_clip is not None:
        torch.nn.utils.clip_grad_norm_(network.parameters(), grad_clip)
    optim.step()

In [6]:
def calc_current_q(model, states, actions):
    curr_q = model(states)
    curr_q = curr_q.gather(1, actions.unsqueeze(1).long())
    
    return curr_q

def calc_target_q(model, rewards, next_states, dones, gamma_n=0.99):
    with torch.no_grad():
        next_q = model(next_states)
        #next_v = next_q.max(dim=1, keepdim=True).value()
        next_v, index = torch.max(next_q, axis=1, keepdim=True)
    
    target_q = rewards.view_as(next_v) + (1.0 - dones.view_as(next_v)) * gamma_n * next_v
    
    
    return target_q
    
def calc_critic_loss(model, states, actions, rewards, dones, next_states, gamma):
    curr_q = calc_current_q(model, states, actions)
    target_q = calc_target_q(model, rewards, next_states, dones, gamma_n=gamma)

    # TD errors for updating priority weights
    errors = torch.abs(curr_q.detach() - target_q)
    # We log means of Q to monitor training.
    mean_q = curr_q.detach().mean().item()
    
    # Critic loss is mean squared TD errors with priority weights.
    q_loss = torch.mean((curr_q - target_q).pow(2))

    return q_loss, errors, mean_q

In [7]:
def update(agent, observation_buffer, action_buffer, reward_buffer, done_buffer, gamma=0.99):
    observations = torch.Tensor(observation_buffer)
    states = observations[1:]
    next_states = observations[:-1]
    actions = torch.Tensor(action_buffer)
    rewards = torch.Tensor(reward_buffer)
    dones = torch.Tensor(done_buffer)
    
    q_loss, errors, mean_q = calc_critic_loss(agent.learner_model.Q, states, actions, rewards, dones, next_states, gamma)
    
    update_params(agent.optim, agent.learner_model, q_loss)
    
    return q_loss
    

訓練

In [65]:

def train(agent, n_train_steps, env_name, tau=0.005, seed=None):
    start_time = time.time()
    
    env = gym.make(env_name)
    if seed != None:
        env.seed(seed)
    
    last_soft_update = 0
    t = 0
    while(True):
        observation_buffer = []
        action_buffer = []
        reward_buffer = []
        done_buffer = []
        observation = env.reset()
        observation_buffer.append(observation)
        
        total_reward = 0
        while(True):
            
            action = agent.get_action(observation, t/n_train_steps)
            observation, reward, done, info = env.step(action) # 行動を環境に反映させる
            total_reward += reward
            
            observation_buffer.append(observation)
            reward_buffer.append(reward)
            action_buffer.append(action)
            if done:
                done_buffer.append(1)
            else:
                done_buffer.append(0)
            t += 1
            if done:
                #print("done")
                display.clear_output(wait=True)
                q_loss = update(agent, observation_buffer, action_buffer, reward_buffer, done_buffer, gamma=0.99)
                
                soft_update(agent.model, agent.learner_model, tau)
                
                print("step : "+str(t)+" / "+str(n_train_steps))
                print("sps : "+str(t/(time.time()-start_time)))
                print("total reward : "+str(total_reward))
                print("loss : "+str(q_loss.item()))
                # エピソードが終了したら、環境をリセットする
                break
        if t >= n_train_steps:
            break
    

In [66]:
def test(model, env_name, n_episode=1, seed=None):
    env = gym.make(env_name)
    if seed != None:
        env.seed(seed)
    
    sum_total_reward = 0
    for ep in range(n_episode):
        observation = env.reset()
        total_reward = 0
        while(True):
            img = env.render("rgb_array") # 画面の表示
            if display_notebook:
                plt.imshow()
                display.clear_output(wait=True)
                display.display(plt.gcf())

            action = model.get_action_greedy(observation) # ランダムな行動をとる
            observation, reward, done, info = env.step(action) # 行動を環境に反映させる
            total_reward += reward
            if done:
                print("total reward : "+str(total_reward))
                sum_total_reward += total_reward
                break
    print("")
    print("average reward : "+str(sum_total_reward/n_episode))
    env.close()  

In [67]:
#env_name = 'MountainCar-v0'

#env_name = "CartPole-v0" # 扱う環境の指定
env_name = "Acrobot-v1"

if env_name == 'MountainCar-v0':
    # 単純なDQNではうまく学習できず
    n_train_steps = 100_000
    epsilon_start = 1
    epsilon_end = 0.01
    lr=0.001
    hidden_units=[24, 48]
    tau=0.1

if env_name == "Acrobot-v1":
    n_train_steps = 20_000
    epsilon_start = 0.3
    epsilon_end = 0.01
    lr=0.003
    hidden_units=[128, 128]
    tau=0.05
if env_name == "CartPole-v0":
    n_train_steps = 8_000
    epsilon_start = 0.2
    epsilon_end = 0.01
    lr=0.001
    hidden_units=[128, 128]
    tau=0.05

seed=0
torch.manual_seed(seed) 
np.random.seed(seed)  

model = Agent(epsilon_start=epsilon_start, epsilon_end=epsilon_end, learning_rate=lr, hidden_units=hidden_units)

train(model, n_train_steps, env_name, tau=tau, seed=seed)

print("\ntest")
# Greedyな行動のみでテスト
test(model, env_name,1)

step : 20031 / 20000
sps : 2643.4876217718074
total reward : -108.0
loss : 2.7201170921325684

test
total reward : -127.0

average reward : -127.0


In [68]:
test(model, env_name,3)

total reward : -93.0
total reward : -124.0
total reward : -117.0

average reward : -111.33333333333333


学習済みモデルの保存

In [13]:
import os
folder = "./save_data/" + env_name
os.makedirs(folder, exist_ok=True)
checkpointpath = os.path.expandvars(
    os.path.expanduser(folder+"/model.tar")
)


In [14]:
torch.save(
    {
        "model_state_dict": agenta.model.state_dict(),
        "learner_model_state_dict": agenta.learner_model.state_dict(),
        "optimizer_state_dict": agenta.optim.state_dict(),
    },
    checkpointpath,
)