# REINFORCE

코드 구현에 필요한 패키지 불러오기

In [None]:
# Import packages 

import gym
import numpy as np
from itertools import count
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

사용할 게임 환경 설정 & Hyper parameter 정의하기

게임 환경 : CartPole-v1 

In [None]:
# Hyper Parameters

LR = 0.01  # learning rate
GAMMA = 0.99  # Reward discount
LOG_INTERVAL = 10  # Interval between trainging status logs

env = gym.make('CartPole-v1')
env = env.unwrapped
N_ACTIONS = env.action_space.n
N_STATES = env.observation_space.shape[0]

Fully Connected Neural Network 구현

모든 노드 간에 연결 되어있는 모델

입력(Input) : 게임의 상태 정보
출력(Output) : 행동에 대한 확률 값

행동 선택 : 환경의 상태를 입력으로 하여 Network의 출력인 행동에 대한 확률 값을 얻음. 이 후에 확률 값을 범주화하여 행동을 선택

정책 손실 함수와 보상의 곱의 합을 곱한 후 가중치를 학습(크로스 엔트로피 함수)

In [None]:
# Create networks

class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.eps = None
        self.optimizer = None
        
        self.fc1 = nn.Linear(N_STATES, 50)
        self.fc1.weight.data.normal_(0, 0.1)  # Initialization
        self.out = nn.Linear(50, N_ACTIONS)
        self.out.weight.data.normal_(0, 0.1)  # Initialization
        
        self.saved_log_probs = []
        self.rewards = []
        
    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        actions_value = self.out(x)
        actions_policy = F.softmax(actions_value, dim=1)
        return actions_policy
    
    def select_action(self, x):
        state = torch.from_numpy(x).float().unsqueeze(0)
        probs = self.forward(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs.append(m.log_prob(action))
        return action.item()
    
    def finish_episode(self):
        R = 0
        policy_loss = []
        returns = []
        for r in self.rewards[::-1]:
            R = r + GAMMA * R
            returns.insert(0, R)
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + self.eps)
        for log_prob, R in zip(self.saved_log_probs, returns):
            policy_loss.append(-log_prob * R)
        self.optimizer.zero_grad()
        policy_loss = torch.cat(policy_loss).sum()
        policy_loss.backward()
        self.optimizer.step()
        del self.rewards[:]
        del self.saved_log_probs[:]
        

학습

한 에피소드의 지정한 타임 스탭 내에서 얻은 보상을 더한 후 학습을 진행

에피소드 단위로 학습을 진행한 후, running_reward가 환경의 보상 임계치를 초과할 경우 문제를 해결하며 학습을 종료

In [None]:
def main():
    policy = Policy()
    policy.optimizer = optim.Adam(policy.parameters(), lr=LR)
    policy.eps = np.finfo(np.float32).eps.item()
    
    episode = []
    re = []
    running_reward = 10
    for i_episode in count(1):
        state = env.reset()
        ep_reward = 0
        for t in range(1, 10000):  # Don't infinite loop while learning
            action = policy.select_action(state)
            state, reward, done, _ = env.step(action)
            env.render()
            
            policy.rewards.append(reward)
            ep_reward += reward
            if done:
                break
                
        episode.append(i_episode)
        
        running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward
        re.append(running_reward)
        
        policy.finish_episode()
        if i_episode % LOG_INTERVAL == 0:
            print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                  i_episode, ep_reward, running_reward))
            
        if running_reward > env.spec.reward_threshold:
            print("Solved! Running reward is now {} and "
                  "the last episode runs to {} time steps!".format(running_reward, t))
            break
            
    env.close()
    
    plt.title("Cartpole scores for 200 episodes")
    plt.plot(episode, re)
    plt.xlabel("episode")
    plt.ylabel("avg_ep_reward")
    plt.show()

In [None]:
if __name__ == '__main__':
    main()