In [None]:
import gym
import torch
import torch.nn as nn                      # torch내부의 매서드를 정의해줘서 코드를 짧게 짤 수 있게 해준다.
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# 하이퍼 파라미터
learning_rate = 0.0002
gamma = 0.98
epoch = 1000

### 메인 함수


In [None]:
def main():
    env = gym.make('CartPole-v1') #카트폴 환경을 env로 설정
    pi = Policy()   # 정책 클래스를 호출해주면 __call__에 의해 forward가 자동 실행된다.
    score = 0.0     # 리턴이 곧 score
    print_interval = 20

    for n_epi in range(epoch):
        s = env.reset()   # 초기 state로 리셋해주고 시작
        done = False

        while not done:
            prob = pi(torch.from_numpy(s).float())
            # 아마도 확률을 관측값에 따라 변동시켜주는 것, 다시말해 정책 π_θ에 해당하는 것 같다.
            m = Categorical(prob) # 행동의 경우의 수가 적은경우 categorical을 쓴다고함
            # 샘플이 쌓일때마다 확률이 높은 액션은 자주, 낮은 액션은 덜 뽑히게 됨, 
            a = m.sample()
            s_prime, r, done, info = env.step(a.item())
            pi.put_data((r, prob[a]))
            s = s_prime
            score += r

        pi.train_net()

        if n_epi%print_interval==0 and n_epi!=0: #인터벌로 나눴을 때 0이고, 첫 에피소드가 아니면
            print("# of epidode :{}, avg score : {}".format(n_epi, score/print_interval))
            score = 0.0
    env.close()


### 정책 네트워크 클래스

In [None]:
class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.data = []

        self.fc1 = nn.Linear(4, 128) #fc없는 단순 모델
        self.fc2 = nn.Linear(128, 2)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=0)
        #아마도? 정책은 확률값으로 나타나니까 소프트맥스를 써준것 같다.
        return x

    def put_data(self, item):
        self.data.append(item)

    def train_net(self):
        R = 0
        self.optimizer.zero_grad() # gradient 계산후 변수가 남아있어서 일단 매번 리셋해준다고 함
        for r, prob in self.data[::-1]:
            R = r + gamma * R #리턴 식
            loss = -R * torch.log(prob) # 9.2에서 손실함수, 정리 노트 참조
            loss.backward() # 그래디언트 계산 후
        self.optimizer.step() # 그래디언트 방향으로 lr만큼 θ를 조정
        self.data = []

In [None]:
main()

# of epidode :20, avg score : 29.15
# of epidode :40, avg score : 19.85
# of epidode :60, avg score : 23.2
# of epidode :80, avg score : 25.1
# of epidode :100, avg score : 23.95
# of epidode :120, avg score : 24.8
# of epidode :140, avg score : 28.75
# of epidode :160, avg score : 28.35
# of epidode :180, avg score : 27.75
# of epidode :200, avg score : 35.35
# of epidode :220, avg score : 35.7
# of epidode :240, avg score : 35.15
# of epidode :260, avg score : 39.6
# of epidode :280, avg score : 32.65
# of epidode :300, avg score : 35.35
# of epidode :320, avg score : 42.35
# of epidode :340, avg score : 50.6
# of epidode :360, avg score : 39.45
# of epidode :380, avg score : 39.25
# of epidode :400, avg score : 36.2
# of epidode :420, avg score : 62.9
# of epidode :440, avg score : 48.85
# of epidode :460, avg score : 52.75
# of epidode :480, avg score : 53.9
# of epidode :500, avg score : 48.15
# of epidode :520, avg score : 54.7
# of epidode :540, avg score : 61.4
# of epidode :56

## TD Actor-Critic 구현

### 라이브러리 import 및 하이퍼 파라미터 정의


In [None]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# 하이퍼 파라미터
learning_rate = 0.0002
gamma = 0.98
epoch = 1000
n_rollout = 10   # 10개의 상태전이를 모아서 업데이트함

### 메인 함수

In [None]:
def main():
    env = gym.make('CartPole-v1')
    model = ActorCritic()
    print_interval = 20 #20회마다 에피소드 수랑 스코어 출력
    score = 0

    for n_epi in range(epoch):
        done = False
        s = env.reset()

        while not done:
            for t in range(n_rollout): #10번을 채울때까지 put_data에 데이터를 일단 모아두자
                prob = model.pi(torch.from_numpy(s).float()) #정책 : 행동확률을 결정
                m = Categorical(prob) #확률에 따른 행동 분포를 만들어 주는듯 (coin)대신 선택하도록
                a = m.sample().item() #확률에 따라 액션 선택!
                s_prime, r, done, info = env.step(a) #step에 따른 데이터를 받아서
                model.put_data((s,a,r,s_prime,done)) # 데이터를 저장 후

                s = s_prime
                score += r

                if done:
                    break

            model.train_net() #한번에 트레인

        if n_epi%print_interval==0 and n_epi!=0: 
            print("# 에피소드수 :{}, 평균 스코어 : {}".format(n_epi, score/print_interval))
            score = 0.0
    env.close()

### 액터 크리틱 클래스


In [None]:
class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.data = []

        self.fc1 = nn.Linear(4, 256)
        self.fc_pi = nn.Linear(256, 2) # π를 업데이트하는 뉴럴넷 (정책 함수 파라미터)
        self.fc_v = nn.Linear(256, 1) # Φ를 업데이느 하는 뉴럴넷 (상태 가치함수 파라미터)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def pi(self, x, softmax_dim = 0): # pi함수 fc1 -> relu -> fcpi -> softmax ->prob (어떤 행동을 할지에 대한 확률 정책)
        x = F.relu(self.fc1(x)) 
        x = self.fc_pi(x)
        prob = F.softmax(x, dim=softmax_dim) # 취할수 있는 차량의 행동은 좌, 우 두개이므로 2개의 아웃풋
        return prob

    def v(self, x): # v함수 fc1->relu -> fc_v -> 하나의 결과값 (입력 s(4개 요소)에 대한 상태 가치를 리턴)
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v

    def put_data(self, transition): #rollout까지 transition 들을 업데이트
        self.data.append(transition)

    def make_batch(self):
        s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], [] #각각의 데이터를 리스트에 넣는다
        for transition in self.data: #data에서 batch를 꺼내서 넣어줌
            s, a, r, s_prime, done = transition
            s_lst.append(s)
            a_lst.append([a]) #액션은 복수라 리스트인듯?
            r_lst.append([r/100.0]) # 액션에따른 리턴이라 리스트 (값이 커질가능성이 있어 100으로 나눠준다고함)
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done else 1.0 # done여부 원핫코딩
            done_lst.append([done_mask])

        s_batch, a_batch, r_batch, s_prime_batch, done_batch = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), torch.tensor(r_lst,  dtype=torch.float), torch.tensor(s_prime_lst,  dtype=torch.float), torch.tensor(done_lst,  dtype=torch.float)
        #각 값을 토치 텐서, float으로 변화해서 리턴함
        self.data = []
        return s_batch, a_batch, r_batch, s_prime_batch, done_batch

    def train_net(self):
        s, a, r, s_prime, done = self.make_batch()
        td_target = r + gamma*self.v(s_prime)*done
        delta = td_target - self.v(s)

        pi = self.pi(s, softmax_dim=1)
        pi_a = pi.gather(1,a) # pi에 있는 1번 축에서 a데이터만 모은다 인것 같다.
        loss = -torch.log(pi_a)*delta.detach() + F.smooth_l1_loss(self.v(s), td_target.detach()) #detach는 미분시 상수로 취급

        self.optimizer.zero_grad()
        loss.mean().backward() # 10개의 loss의 평균을 최종 loss로 정의
        self.optimizer.step()

In [None]:
if __name__ == '__main__':
    main()

# 에피소드수 :20, 평균 스코어 : 27.15
# 에피소드수 :40, 평균 스코어 : 17.5
# 에피소드수 :60, 평균 스코어 : 29.65
# 에피소드수 :80, 평균 스코어 : 28.2
# 에피소드수 :100, 평균 스코어 : 25.35
# 에피소드수 :120, 평균 스코어 : 21.65
# 에피소드수 :140, 평균 스코어 : 29.95
# 에피소드수 :160, 평균 스코어 : 37.2
# 에피소드수 :180, 평균 스코어 : 34.15
# 에피소드수 :200, 평균 스코어 : 34.4
# 에피소드수 :220, 평균 스코어 : 41.85
# 에피소드수 :240, 평균 스코어 : 47.5
# 에피소드수 :260, 평균 스코어 : 54.6
# 에피소드수 :280, 평균 스코어 : 62.35
# 에피소드수 :300, 평균 스코어 : 89.95
# 에피소드수 :320, 평균 스코어 : 135.5
# 에피소드수 :340, 평균 스코어 : 121.45
# 에피소드수 :360, 평균 스코어 : 173.85
# 에피소드수 :380, 평균 스코어 : 215.15
# 에피소드수 :400, 평균 스코어 : 183.85
# 에피소드수 :420, 평균 스코어 : 170.75
# 에피소드수 :440, 평균 스코어 : 219.35
# 에피소드수 :460, 평균 스코어 : 213.65
# 에피소드수 :480, 평균 스코어 : 283.4
# 에피소드수 :500, 평균 스코어 : 244.9
# 에피소드수 :520, 평균 스코어 : 206.2
# 에피소드수 :540, 평균 스코어 : 211.3
# 에피소드수 :560, 평균 스코어 : 239.7
# 에피소드수 :580, 평균 스코어 : 366.2
# 에피소드수 :600, 평균 스코어 : 293.05
# 에피소드수 :620, 평균 스코어 : 272.2
# 에피소드수 :640, 평균 스코어 : 260.9
# 에피소드수 :660, 평균 스코어 : 274.15
# 에피소드수 :680, 평균 스코어 : 351.9
# 에피소드수 :700, 평