### 정책 기반 에이전트 - REINFORCE 구현

In [None]:
# REINFORCE algorithm for CartPole 동작 과정 요약 (On-policy)

for n_epi in range(10000):

    while not done: # 한 episode 동안 ..
        # 각 state에서, 각 action에 대한 prob를 계산
        # prob에 비례하게 action을 선택
        # 해당 action에 대한 reward와 prob를 저장(put_data)
        # transition

    pi.train_net() 
    # 하나의 episode동안 모은 데이터를 이용해 parameter update를 실시
    # 이때, G_t를 구해야 하기 때문에, episode의 뒤부터 훑으면서 state별 gradient를 계산
    
env.close()

##### 라이브러리 Import 및 하이퍼 파라미터 정의

In [12]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Hyperparameters
learning_rate = 0.0002
gamma = 0.98

##### 정책 네트워크 클래스

In [41]:
class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.data = []

        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 2) # 취할 수 있는 Action의 개수가 2개이므로
        self.optimizer = optim.Adam(self.parameters(), lr = learning_rate)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim = 0)
        return x # 2개의 action에 대한 확률값 return
    
    def put_data(self, item): # REINFORCE 알고리즘에서는 pi_theta(s_t, a_t)와 G_t만 있으면 loss를 계산할 수 있기 때문에 확률값 prob[a]와 보상 r을 데이터에 저장
        self.data.append(item)

    def train_net(self):
        R = 0
        self.optimizer.zero_grad() # gradient 초기화(pytorch에서는 step을 거치며 이전 gradient를 계속 누적시키기 때문에)
        for r, prob in self.data[::-1]:
            R = r + gamma * R # Episode의 맨 뒤의 데이터부터 보면서 reward를 누적 (따라서 R은 G_t)
            loss = - R * torch.log(prob) # Gradient Ascent를 하기 위해서 -를 붙임 / 더 좋은 액션을 강화하는 형태
            loss.backward() # loss를 계산한 후 각 파라미터에 대해 미분값을 계산 (누적)
            # print("n_epi : ", n_epi, "Loss : ", loss)
        self.optimizer.step() # parameter 업데이트
        self.data = [] # 데이터 초기화

##### 메인 함수

In [42]:
import time

def main():
    env = gym.make('CartPole-v1')
    pi = Policy()
    score = 0.0
    print_interval = 20
    br = True

    for n_epi in range(10000):
        start_time = time.time()
        s, _ = env.reset() # s = 길이 4 벡터
        done = False

        while not done:
            if time.time() - start_time > 600 : # 강제 종료 조건 삽입
                br = False
                break
            prob = pi(torch.from_numpy(s).float()) # prob : tensor([0.4491, 0.5509], grad_fn=<SoftmaxBackward0>)
            m = Categorical(prob)
            a = m.sample() # a: tensor(1) /  각 Category에 대한 확률을 계산하고, 그 중 하나를 뽑는다.(type : tensor) / a = 0 or 1
            # prob가 높은 action은 더 자주, prob가 낮은 action은 덜 뽑히게 된다.
            s_prime, r, done, truncat1ed, info = env.step(a.item()) # a.item() : tensor를 int로 바꿔준다.
            # 이때 r = 1. 따라서 최대한 오랜 시간 동안 CartPole을 유지하도록 학습
            pi.put_data((r, prob[a])) # r : 1 / prob[a] : tensor(0.5509)
            s = s_prime
            score += r
        
        if br == False:
            break

        pi.train_net() # episode가 끝나면 하나의 episode동안 모은 데이터를 이용해 Parameter(theta) Update를 진행
        
        if n_epi % print_interval == 0 and n_epi != 0: # 20번의 episode마다 avg score 출력
            print("# of episode : {}, avg score : {}".format(n_epi, score / print_interval))
            score = 0.0
            
    env.close()

if __name__ == '__main__':
    main()

# of episode : 20, avg score : 35.15
# of episode : 40, avg score : 24.95
# of episode : 60, avg score : 30.65
# of episode : 80, avg score : 33.1
# of episode : 100, avg score : 32.55
# of episode : 120, avg score : 26.7
# of episode : 140, avg score : 35.25
# of episode : 160, avg score : 28.15
# of episode : 180, avg score : 35.3
# of episode : 200, avg score : 33.3
# of episode : 220, avg score : 28.7
# of episode : 240, avg score : 33.35
# of episode : 260, avg score : 42.6
# of episode : 280, avg score : 47.55
# of episode : 300, avg score : 36.2
# of episode : 320, avg score : 45.55
# of episode : 340, avg score : 40.15
# of episode : 360, avg score : 37.15
# of episode : 380, avg score : 42.7
# of episode : 400, avg score : 42.3
# of episode : 420, avg score : 37.9
# of episode : 440, avg score : 39.15
# of episode : 460, avg score : 36.85
# of episode : 480, avg score : 44.75
# of episode : 500, avg score : 44.65
# of episode : 520, avg score : 46.85
# of episode : 540, avg sc

### 정책 기반 에이전트 - TD Actor - Critic 구현

In [None]:
# TD Actor-Critic algorithm for CartPole 동작 과정 요약 (On-policy)

##### 라이브러리 Import 및 하이퍼 파라미터 정의

In [None]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributions import Categorical

# Hyperparameters
learning_rate = 0.0002
gamma = 0.98
n_rollout = 10 # 몇 틱의 데이터를 쌓아서 업데이트 할지를 나타냄(여기서는 10번의 transition)

##### 액터 크리틱 클래스

In [None]:
class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.data = []

        self.fc1 = nn.Linear(4, 256)
        # 256개의 노드로 이루어진 첫 번째 Layer는 Actor와 Critic의 공통 Layer(정책 네트워크 업데이트 때도, 밸류 네트워크 업데이트에서도 항상 업데이트됨)
        self.fc_pi = nn.Linear(256, 2)
        self.fc_v = nn.Linear(256, 1)
        self.optimizer = optim.Adam(self.parameters(), lr = learning_rate)

    def pi(self, x, softmax_dim = 0):
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim = softmax_dim)
        return prob
    
    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v
    
    def put_data(self, transition):
        self.data.append(transition)

    def make_batch(self): # n_rollout만큼의 data들을 분리해서 mini batch를 만들어줌
        s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], []
        for transition in self.data:
            s, a, r, s_prime, done = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r / 100.0])
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done else 1.0
            done_lst.append([done_mask])

        s_batch, a_batch, r_batch, s_prime_batch, done_batch = torch.tensor(s_lst, dtype = torch.float), torch.tensor(a_lst), \
            torch.tensor(r_lst, dtype = torch.float), torch.tensor(s_prime_lst, dtype = torch.float), torch.tensor(done_lst, dtype = torch.float)
        self.data = []
        return s_batch, a_batch, r_batch, s_prime_batch, done_batch
    
    def train_net(self):
        s, a, r, s_prime, done = self.make_batch() # 각각 n_rollout만큼의 데이터를 가지고 있음
        td_target = r + gamma * self.v(s_prime) * done
        delta = td_target - self.v(s) # baseline을 빼줌 (value network의 loss)

        pi = self.pi(s, softmax_dim = 1)
        pi_a = pi.gather(1, a)
        loss = -torch.log(pi_a) * delta.detach() + F.smooth_l1_loss(self.v(s), td_target.detach()) # 정책 네트워크의 손실 함수와 밸류 네트워크의 손실 함수를 더하여 한 번에 업데이트를 진행

        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()

##### 메인 함수

In [None]:
def main():
    env = gym.make('CartPole-v1')
    model = ActorCritic()
    print_interval = 20
    score = 0.0

    for n_epi in range(10000):
        done = False
        s = env.reset()

        while not done:
            for t in range(n_rollout): # n_rollout : 10
                prob = model.pi(torch.from_numpy(s).float()) # action을 선택(이때, prob는 2개의 action에 대한 확률값)
                m = Categorical(prob)
                a = m.sample().item()
                s_prime, r, done, info = env.step(a)
                model.put_data((s, a, r, s_prime, done)) # n_rollout만큼의 transition을 저장

                s = s_prime
                score += r

                if done : break

            model.train_net()

        if n_epi % print_interval == 0 and n_epi != 0:
            print("# of episode : {}, avg score : {}".format(n_epi, score / print_interval))
            score = 0.0

    env.close()