In [7]:
import sys
sys.path.append('/home/rhino/anaconda3/envs/rl_env/lib/python3.9/site-packages')
# 구현에 사용할 패키지 임포트
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

from collections import namedtuple

# Execute DQN
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

'''
Fixed Target Q-Network:
Q-Network를 수정하는 데 사용하는 Q 값을 같은 신경망에서 계산하면 학습이 불안정해지기 때문에 별도의 신경망을 두는 방법이다.
앞선 DQN은 이 과정을 생략하고 미니배치 학습으로 대체했다.

DQN은 두가지 버전이 있다.
하나는 앞서 구현한 버전, 나머지는 Target Q-Network를 따로 두고 Main Q-Network를 학습시키는 방식이다.
이때, 몇 단계마다 한번씩 Target을 수정해서 Main과 일치시킨다.

* 두번째 버전의 DQN에서 Q value를 업데이트 할 때 maxQ를 구하는 과정에서 Target Q-Network(Q_t)를 사용한다.
* DDQN에서는 해당 업데이트 식을 안정화 했는데, 그 방법은 기존의 maxQ_t(s_t+1, a)를 Q_t(s_t+1, a_m)으로 변경한다.
이때 a_m = arg maxQ_m(s_t+1, a)으로 정해진다.

DDQN을 구현할 때 주의할 점은 신경망을 2개 사용한다는 점이다.
'''

In [8]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

# 상수 정의
ENV = 'CartPole-v0'  # 태스크 이름
GAMMA = 0.99  # 시간할인율
MAX_STEPS = 200  # 1에피소드 당 최대 단계 수
NUM_EPISODES = 500  # 최대 에피소드 수

In [4]:
# memory class for saving transition
class ReplayMemory:
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY # Max size of memory
        self.memory = [] # parameter for stacking transition
        self.index = 0

    # 메모리에 Transition tuple에 cartpole의 정보를 저장한다.
    def push(self, state, action, state_next, reward):
        # save transition in memory
        if len(self.memory) < self.capacity:
            self.memory.append(None) # memory is not full
            
        self.memory[self.index] = Transition(state, action, state_next, reward)
        
        self.index = (self.index + 1) % self.capacity

    # Transition이 저장된 memory에서 batch_size만큼 랜덤으롤 샘플을 뽑는다.        
    def sample(self, batch_size):
        # Randomize by the number of batches
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        # return the number of transition
        return len(self.memory)

In [9]:
class Net(nn.Module):
    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.fc3 = nn.Linear(n_mid, n_out)
        
    def foward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        output = self.fc3(h2)
        return output

In [24]:
BATCH_SIZE = 32
CAPACITY = 10000

class Brain: 
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions
        
        self.memory = ReplayMemory(CAPACITY)
        
        n_in, n_mid, n_out = num_states, 32, num_actions
        self.main_q_network = Net(n_in, n_mid, n_out)
        self.target_q_network = Net(n_in, n_mid, n_out)
        print(self.main_q_network)
        
        self.optimizer = optim.Adam(self.main_q_network.parameters(), lr=0.0001)
        
    def replay(self):
        if len(self.memory) < BATCH_SIZE:
            return
        
        self.batch, self.state_batch, self.action_batch,\
        self.reward_batch, self.non_final_next_states = self.make_minibatch()
        
        self.expected_state_action_values = self.get_expected_state_action_values()
        
        self.update_main_q_network()
        
    def decide_action(self, state, episode):
        '''현재 상태로부터 행동을 결정함'''
        # ε-greedy 알고리즘에서 서서히 최적행동의 비중을 늘린다
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            self.main_q_network.eval()  # 신경망을 추론 모드로 전환
            with torch.no_grad():
                action = self.main_q_network(state).max(1)[1].view(1, 1)
            # 신경망 출력의 최댓값에 대한 인덱스 = max(1)[1]
            # .view(1,1)은 [torch.LongTensor of size 1] 을 size 1*1로 변환하는 역할을 한다

        else:
            # 행동을 무작위로 반환(0 혹은 1)
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])  # 행동을 무작위로 반환(0 혹은 1)
            # action은 [torch.LongTensor of size 1*1] 형태가 된다
        '''현재 상태로부터 행동을 결정함'''
        # ε-greedy 알고리즘에서 서서히 최적행동의 비중을 늘린다
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            self.main_q_network.eval()  # 신경망을 추론 모드로 전환
            with torch.no_grad():
                action = self.main_q_network(state).max(1)[1].view(1, 1)
            # 신경망 출력의 최댓값에 대한 인덱스 = max(1)[1]
            # .view(1,1)은 [torch.LongTensor of size 1] 을 size 1*1로 변환하는 역할을 한다

        else:
            # 행동을 무작위로 반환(0 혹은 1)
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])  # 행동을 무작위로 반환(0 혹은 1)
            # action은 [torch.LongTensor of size 1*1] 형태가 된다

        return action
    
    
    def make_minibatch(self):
        transitions = self.memory.sample(BATCH_SIZE)
        
        batch = Transition(*zip(*transitions))
        
        state_batch= torch.cat(batch.state)
        action_batch= torch.cat(batch.action)
        reward_batch= torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        print("non_final_next_states = ", non_final_next_states)
        return batch, state_batch, action_batch, reward_batch, non_final_next_states
    
    def get_expected_state_action_values(self):
        self.main_q_network.eval()
        self.target_q_network.eval()
        
        self.state_action_values = self.main_q_network(self.state_batch).gather(1, self.action_batch)
        
        
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, self.batch.next_state)))
        
        next_sate_values = torch.zeros(BATCH_SIZE)
        
        # DDQN
        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensr)
        
        a_m[non_final_mask] = self.main_q_network(self.non_final_next_states).detach().max(1)[1]
        
        a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
        
        next_state_values[non_final_mask] = self.target_q_network(self.non_final_next_states).\
        gather(1, a_m_non_final_next_states).detach().squeeze()
        
        expected_state_action_values = self.reward_batch + GAMMA * next_state_values
        
        return expected_state_action_values
    
    def update_main_q_network(self):
        self.main_q_network.train()
        
        loss =F.smooth_l1_loss(self.state_action_values, self.expected_state_action_values.unsqueeze(1))
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
    def update_target_q_network(self):
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())
                

In [25]:
# CartPole 태스크의 에이전트 클래스. 봉 달린 수레 자체라고 보면 된다


class Agent:
    def __init__(self, num_states, num_actions):
        '''태스크의 상태 및 행동의 가짓수를 설정'''
        self.brain = Brain(num_states, num_actions)  # 에이전트의 행동을 결정할 두뇌 역할 객체를 생성

    def update_q_function(self):
        '''Q함수를 수정'''
        self.brain.replay()

    def get_action(self, state, episode):
        '''행동을 결정'''
        action = self.brain.decide_action(state, episode)
        return action

    def memorize(self, state, action, state_next, reward):
        '''memory 객체에 state, action, state_next, reward 내용을 저장'''
        self.brain.memory.push(state, action, state_next, reward)

    def update_target_q_function(self):
        '''Target Q-Network을 Main Q-Network와 맞춤'''
        self.brain.update_target_q_network()

In [26]:
# CartPole을 실행하는 환경 클래스


class Environment:

    def __init__(self):
        self.env = gym.make(ENV)  # 태스크를 설정
        num_states = self.env.observation_space.shape[0]  # 태스크의 상태 변수 수(4)를 받아옴
        num_actions = self.env.action_space.n  # 태스크의 행동 가짓수(2)를 받아옴
        self.agent = Agent(num_states, num_actions)  # 에이전트 역할을 할 객체를 생성

    def run(self):
        '''실행'''
        episode_10_list = np.zeros(10)  # 최근 10에피소드 동안 버틴 단계 수를 저장함(평균 단계 수를 출력할 때 사용)
        complete_episodes = 0  # 현재까지 195단계를 버틴 에피소드 수
        episode_final = False  # 마지막 에피소드 여부
        frames = []  # 애니메이션을 만들기 위해 마지막 에피소드의 프레임을 저장할 배열

        for episode in range(NUM_EPISODES):  # 최대 에피소드 수만큼 반복
            observation = self.env.reset()  # 환경 초기화

            state = observation  # 관측을 변환없이 그대로 상태 s로 사용
            state = torch.from_numpy(state).type(
                torch.FloatTensor)  # NumPy 변수를 파이토치 텐서로 변환
            state = torch.unsqueeze(state, 0)  # size 4를 size 1*4로 변환

            for step in range(MAX_STEPS):  # 1 에피소드에 해당하는 반복문
                
                # 애니메이션 만드는 부분을 주석처리
                #if episode_final is True:  # 마지막 에피소드에서는 각 시각의 이미지를 frames에 저장한다
                    # frames.append(self.env.render(mode='rgb_array'))
                    
                action = self.agent.get_action(state, episode)  # 다음 행동을 결정

                # 행동 a_t를 실행하여 다음 상태 s_{t+1}과 done 플래그 값을 결정
                # action에 .item()을 호출하여 행동 내용을 구함
                observation_next, _, done, _ = self.env.step(
                    action.item())  # reward와 info는 사용하지 않으므로 _로 처리

                # 보상을 부여하고 episode의 종료 판정 및 state_next를 설정한다
                if done:  # 단계 수가 200을 넘었거나 봉이 일정 각도 이상 기울면 done이 True가 됨
                    state_next = None  # 다음 상태가 없으므로 None으로

                    # 최근 10 에피소드에서 버틴 단계 수를 리스트에 저장
                    episode_10_list = np.hstack(
                        (episode_10_list[1:], step + 1))

                    if step < 195:
                        reward = torch.FloatTensor(
                            [-1.0])  # 도중에 봉이 쓰러졌다면 페널티로 보상 -1을 부여
                        complete_episodes = 0  # 연속 성공 에피소드 기록을 초기화
                    else:
                        reward = torch.FloatTensor([1.0])  # 봉이 서 있는 채로 에피소드를 마쳤다면 보상 1 부여
                        complete_episodes = complete_episodes + 1  # 연속 성공 에피소드 기록을 갱신
                else:
                    reward = torch.FloatTensor([0.0])  # 그 외의 경우는 보상 0을 부여
                    state_next = observation_next  # 관측 결과를 그대로 상태로 사용
                    state_next = torch.from_numpy(state_next).type(
                        torch.FloatTensor)  # numpy 변수를 파이토치 텐서로 변환
                    state_next = torch.unsqueeze(state_next, 0)  # size 4를 size 1*4로 변환

                # 메모리에 경험을 저장
                self.agent.memorize(state, action, state_next, reward)

                # Experience Replay로 Q함수를 수정
                self.agent.update_q_function()

                # 관측 결과를 업데이트
                state = state_next

                # 에피소드 종료 처리
                if done:
                    print('%d Episode: Finished after %d steps：최근 10 에피소드의 평균 단계 수 = %.1lf' % (
                        episode, step + 1, episode_10_list.mean()))
                    
                    # DDQN으로 추가된 부분 2에피소드마다 한번씩 Target Q-Network을 Main Q-Network와 맞춰줌
                    if(episode % 2 == 0):
                        self.agent.update_target_q_function()
                    break
                    
                    
            if episode_final is True:
                # 애니메이션 생성 부분을 주석처리함
                # 애니메이션 생성 및 저장
                #display_frames_as_gif(frames)
                break

            # 10 에피소드 연속으로 195단계를 버티면 태스크 성공
            if complete_episodes >= 10:
                print('10 에피소드 연속 성공')
                episode_final = True  # 다음 에피소드에서 애니메이션을 생성

In [27]:
# 실행 엔트리 포인트
cartpole_env = Environment()
cartpole_env.run()

Net(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=2, bias=True)
)


NotImplementedError: 