In [1]:
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

In [2]:
# namedtupleを生成
from collections import namedtuple

Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))

In [3]:
ENV = 'CartPole-v0'  # 使用する課題名
GAMMA = 0.99  # 時間割引率
MAX_STEPS = 200  # 1試行のstep数
NUM_EPISODE = 500  # 最大試行回数

BATCH_SIZE = 32
CAPACITY = 10000

num_actions = 2
num_states = 4

In [4]:
class ReplayMemory:
    
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY
        self.memory = []
        self.index = 0
        
    def push(self, state, action, state_next, reward):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        
        self.memory[self.index] = Transition(state, action, state_next, reward)
        self.index = (self.index + 1) % self.capacity
        
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

In [10]:
class Net(nn.Module):
    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.fc3 = nn.Linear(n_mid, n_mid)
        self.output = nn.Linear(n_mid, n_out)
        
    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        output = self.fc3(h2)
        return output

In [6]:
class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions
        self.memory = ReplayMemory(CAPACITY)
        
        n_in, n_mid, n_out = num_states, 32, num_actions
        self.main_q_network = Net(n_in, n_mid, n_out)
        self.target_q_network = Net(n_in, n_mid, n_out)
        print(self.main_q_network)
        
        self.optimizer = optim.Adam(self.main_q_network.parameters(), lr=0.0001)
        
    def replay(self):
        #メモリサイズを確認し、BATCH_SIZEよりも小さければ何も返さない.
        if len(self.memory) < BATCH_SIZE:
            return 
        
        #ミニバッチを作成する
        self.batch, self.state_batch, self.action_batch, self.reward_batch,
        self.non_final_next_states = self.make_minibatch()
    
        #教師信号となるQ(s_t, a_t)を求める
        self.expected_state_action_values = self.get_expected_state_action_values()
    
        #mainのネットワークを更新する。
        self.updates_main_q_network()
    
    def decide_action(self, state, episode):
        #ε-greedy法による行動選択
        epsilon = 0.5 * (1 / (episode + 1))
        
        if epsilon <= np.random.uniform(0, 1):
            self.main_q_network.eval()
            with torch.no_grad():
                action = self.main_q_network(state).max(1)[1].view(1, 1)
        
        else:
            action = torch.LongTensor([[random.randrange(self.num_actions)]])
        
        return action
    
    def make_minibatch(self):
        transitions = self.memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        
        return batch, state_batch, action_batch, reward_batch, non_final_next_states
    
    def get_expected_state_action_values(self):
        self.main_q_network.eval()
        self.target_q_network.eval()
        
        #右辺はstateをnetに放り込んでいるので、出力がactionになっており、それをランダムサンプリングしたactionで抜き出している。
        self.state_action_values = self.main_q_network(self.state_batch).gather(1, self.action_batch)
        
        # batch.next_stateの各要素にNoneが含まれていれば、その箇所をFalseと入力しtupleで格納して、tensorとして出力する。これがマスクになる。
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, self.batch.next_state)))
        
        #次の状態を0とする。
        next_state_values = torch.zeros(BATCH_SIZE)
        
        #Long型のtensorに変換する。
        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)
        
        a_m[non_final_mask] = self.main_q_network(self.non_final_next_states).detach().max(1)[1]
        a_m_non_final_next_states = a_m[non_final_mask].view(-1,1)
        
        #target Netで取得したQ値から次の状態がNoneではないものを取得し, squeezeで形を整える。
        next_state_values[non_final_mask] = self.target_q_network(self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()
        
        #期待されるQ値をQ学習の更新式から求める
        expected_state_action_values = self.reward_batch + GAMMA*next_state_values
        return expected_state_action_values
    
    def update_main_q_network(self):
        
        self.main_q_network.train()
        
        #expected_state_action_valuesはサイズが[minibatch]になっているからunsqueezeを用いて[minibatch ✖︎ 1]にする。
        loss = F.smooth_l1_loss(self.state_action_values, self.expected_state_action_values.unsqueeze(1))
        
        #勾配をリセットにする。
        self.optimizer.zero_grad()
        
        #損失を逆伝播で計算する。
        loss.backward()
        
        #結合パラメータを計算する。
        self.optimizer.step()
        
    def update_target_q_network(self):
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())        

In [7]:
class Agent:
    def __init__(self, num_states, num_actions):
        '''課題の状態と行動の数を設定する'''
        self.brain = Brain(num_states, num_actions)  # エージェントが行動を決定するための頭脳を生成

    def update_q_function(self):
        '''Q関数を更新する'''
        self.brain.replay()

    def get_action(self, state, episode):
        '''行動を決定する'''
        action = self.brain.decide_action(state, episode)
        return action

    def memorize(self, state, action, state_next, reward):
        '''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
        self.brain.memory.push(state, action, state_next, reward)

    def update_target_q_function(self):
        '''Target Q-NetworkをMain Q-Networkと同じに更新'''
        self.brain.update_target_q_network()

In [45]:
class Environment:
    def __init__(self):
        self.env = gym.make(ENV)
        
        #self.envにはstate, action, reward, done, next_stateが格納されており、indexが0の時はstateが格納されている。
        num_states = self.env.observation_space.shape[0]
        
        #self.env.action_spaceにはDescrete(2)が格納されており、indexだけ取り出す。つまり出力は2になる。
        num_action = self.env.action_space.n
        
        #環境内で行動するAgentを生成。
        self.agent = Agent(num_states, num_actions)
    
    def run(self):
        
        episode_10_list = np.zeros(10)
        
        #195step以上連続でたち続けた試行数
        complete_episodes = 0
        
        #最後の試行フラグ
        episode_final = False
        
        for episode in range(NUM_EPISODE):
            
            #observationを初期化し、stateへ代入する。
            observation = self.env.reset()
            state = observation #array配列になっている.
            
            #tensorに変換する。
            state = torch.from_numpy(state).type(torch.FloatTensor)
            
            #stateを 4から1× 4に変換する。
            state = torch.unsqueeze(state, 0)
            
            for step in range(MAX_STEPS):
                #現在のepisodeとstateから行動選択をする。
                action = self.agent.get_action(state, episode)
                
                #上で取得したactionを用いて次の状態、報酬、doneを取得する。
                observation_next, _, done, _ = self.env.step(action.item())
                
                if done:
                    state_next = None
                    
                    #直近10episodeの立てたstep数をリストに追加する。
                    episode_10_list = np.hstack(episode_10_list[1:], step + 1)
                    
                    if step < 195:
                        #途中で倒れたら-1を報酬に与える。
                        reward = torch.FloatTensor([-1.0])
                        #連続成功記録を0にする
                        complete_episodes = 0
                    
                    else:
                        #報酬に1.0を加える。
                        reward = torch.FloatTensor([1.0])
                        
                        #今までのエピソード数を更新する。
                        complete_episode = cpmplete_episode + 1
                
                #倒れていない時
                else:
                    reward = torch.FloatTensor([0.0])
                    state_next = observation_next
                    state_next = torch.from_numpy(state_next).type(torch.FloatTensor)
                    state_next = torch.unsqueeze(state_next, 0)
                
                #倒れていようがいまいが、メモリに追加する。
                self.agent.memorize(state, action, state_next, reward)
                
                #Experience replayでQ値を更新する。
                self.agent.update_q_function()
                
                #stateの更新
                state = state_next
                
                # 終了時の処理
                if done:
                    print('%d Episode: Finished after %d steps: 10試行の平均step数 = %.1lf' % (episode, step + 1, episode_10_list.mean()))
                    
                    #2回に1回target networkをmain netで更新する。
                    if(episode % 2 == 0):
                        self.agent.update_target_q_function()
                        break
                    
                    if episode_final is True:
                        break
                    
                    if complete_episodes >= 10:
                        print('10回連続成功')
                        episode_final = True

In [46]:
# main クラス
cartpole_env = Environment()

Net(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=32, bias=True)
  (output): Linear(in_features=32, out_features=2, bias=True)
)


In [47]:
cartpole_env.run()

AssertionError: 4 (<class 'int'>) invalid