## 1. Cartpole 강화학습 준비

In [24]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

### 1.1 Cartpole 시연함수

In [25]:
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display


def display_frames_as_gif(frames):
    plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0),
               dpi=72)   # 화면크기, 해상도 설정
    patch = plt.imshow(frames[0])   # 첫번째 프레임을 화면에 표시
    plt.axis('off')   # 축 표시 안함

    def animate(i):
        patch.set_data(frames[i])   # i번째 프레임 업데이트

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
                                   interval=50)   # 애니메이션 생성

    anim.save('Cartpole_DDQN.gif')  # 시연영상을 저장
    display(display_animation(anim, default_mode='loop'))

### 1.2 Constant 정의

In [26]:
#name tuple을 사용하여 Transition 정의
from collections import namedtuple

Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))

In [27]:
# 상수 정의
ENV = 'CartPole-v0'   # task
GAMMA = 0.99   # discount factor
MAX_STEPS = 200    # 1에피소드 당 최대 단계 수
NUM_EPISODES = 500    # 최대 에피소드 수

## 2. ReplayMemory

In [28]:
class ReplayMemory:

    def __init__(self, CAPACITY):
        self.capacity = CAPACITY   # 메모리의 최대 저장 건수
        self.memory = []   # 실제 transition을 저장할 변수
        self.index = 0   # 저장 위치를 가리킬 인덱스 변수

    def push(self, state, action, state_next, reward):
        
        # 메모리 공간 확인
        if len(self.memory) < self.capacity:
            self.memory.append(None)  

        self.memory[self.index] = Transition(state, action, state_next, reward)  # 새로운 transition을 메모리에 추가
        
        self.index = (self.index + 1) % self.capacity     # 인덱스 설정

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)   # 메모리에서 랜덤하게 배치 추출

    def __len__(self):
        return len(self.memory)   # 메모리 크기 반환

In [29]:
#!pip install --upgrade pip

In [30]:
#!pip install torch

## 3. Cartpole DDQN 설계

In [31]:
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):  # 신경망 구조 정의

    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)  # 입력층에서 은닉층으로 선형 변환 수행
        self.fc2 = nn.Linear(n_mid, n_mid)   # 은닉층에서 은닉층으로 선형 변환 수행
        self.fc3 = nn.Linear(n_mid, n_out)    # 은닉층에서 출력층으로 선형 변환 수행

    def forward(self, x):   # 순전파 함수
        # 활성화 함수로 비선형성 도입
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        
        output = self.fc3(h2)  # 출력층 연산
        return output

In [32]:
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32    # 배치 크기
CAPACITY = 10000    # 메모리 용량


class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions  

        # 리플레이 메모리 설정
        self.memory = ReplayMemory(CAPACITY)

        # 신경망 초기화
        n_in, n_mid, n_out = num_states, 32, num_actions
        self.main_q_network = Net(n_in, n_mid, n_out)   # 메인 신경망
        self.target_q_network = Net(n_in, n_mid, n_out)   # 타겟 신경망
        print(self.main_q_network)    # 신경망 구조 출력

        # 옵티마이저 초기화
        self.optimizer = optim.Adam(
            self.main_q_network.parameters(), lr=0.0001)

    def replay(self):

        if len(self.memory) < BATCH_SIZE:
            return
        self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()  # 미니배치 생성
        self.expected_state_action_values = self.get_expected_state_action_values()    # 예측되는 Q-value 계산
        self.update_main_q_network()

    def decide_action(self, state, episode):
        epsilon = 0.5 * (1 / (episode + 1))

        # 입실론-그리디 전략으로 action 선택
        if epsilon <= np.random.uniform(0, 1):
            self.main_q_network.eval()    # evaluation mode 로 전환
            with torch.no_grad():
                action = self.main_q_network(state).max(1)[1].view(1, 1)

        else:
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])    # 무작위 action 선택
        return action

    def make_minibatch(self):
        transitions = self.memory.sample(BATCH_SIZE)   # 리플레이 버퍼에서 샘플링

        batch = Transition(*zip(*transitions))

        state_batch = torch.cat(batch.state)  # state의 텐서
        action_batch = torch.cat(batch.action)  # action의 텐서
        reward_batch = torch.cat(batch.reward)  # reward의 텐서
        non_final_next_states = torch.cat([s for s in batch.next_state
                                           if s is not None])   # none을 제외한 다음 state의 텐서

        return batch, state_batch, action_batch, reward_batch, non_final_next_states

    def get_expected_state_action_values(self):

        # evaluation mode 로 전환
        self.main_q_network.eval()
        self.target_q_network.eval()

        # 현재 state의 Q-value 계산
        self.state_action_values = self.main_q_network(
            self.state_batch).gather(1, self.action_batch)

        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
                                                    self.batch.next_state)))
        next_state_values = torch.zeros(BATCH_SIZE)  # 다음 state value의 텐서 초기화

        # 최종 상태가 아닌 다음 state의 최대 Q-value 계산
        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)
        a_m[non_final_mask] = self.main_q_network(
            self.non_final_next_states).detach().max(1)[1]
        a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
        next_state_values[non_final_mask] = self.target_q_network(
            self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

        expected_state_action_values = self.reward_batch + GAMMA * next_state_values   # 예측되는 Q-value 계산

        return expected_state_action_values

    def update_main_q_network(self):

        self.main_q_network.train()  # training mode 로 전환

        # 손실 계산
        loss = F.smooth_l1_loss(self.state_action_values,
                                self.expected_state_action_values.unsqueeze(1))

        self.optimizer.zero_grad()  # 경사 초기화
        loss.backward()  # 역전파 계산
        self.optimizer.step()  # 가중치 업데이트

    def update_target_q_network(self):  # 타겟 신경망 업데이트
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())

## 4. Cartpole Agent 설계

In [33]:
class Agent:
    def __init__(self, num_states, num_actions):
        self.brain = Brain(num_states, num_actions)  
        
    def update_q_function(self):
        self.brain.replay()   # replay 메소드 호출

    def get_action(self, state, episode):
        action = self.brain.decide_action(state, episode)  # 현재 state에서의 action 선택
        return action

    def memorize(self, state, action, state_next, reward):
        self.brain.memory.push(state, action, state_next, reward)   # brain의 메모리에 transition을 저장

    def update_target_q_function(self):
        self.brain.update_target_q_network()   # 타겟 신경망 업데이트

## 5. Cartpole 환경 설계

In [34]:
class Environment:

    def __init__(self):
        # self.env = gym.make(ENV)  
        self.env = gym.make(ENV, render_mode="rgb_array") 
        num_states = self.env.observation_space.shape[0] 
        num_actions = self.env.action_space.n  
        self.agent = Agent(num_states, num_actions)  

    def run(self):
        episode_10_list = np.zeros(10)    # 최근 10 에피소드의 step 수 기록 리스트
        complete_episodes = 0    # 성공 에피소드 수
        episode_final = False     # 마지막 에피소드인지 확인
        frames = []    # 애니메이션 프레임 저장
        
        for episode in range(NUM_EPISODES): 
            observation = self.env.reset()  # 환경 리셋

            state = observation    # observation 저장
            state = state[0]
            state = torch.from_numpy(state).type(
                torch.FloatTensor)    # 텐서로 변환
            state = torch.unsqueeze(state, 0)   # state 텐서 차원 추가

            for step in range(MAX_STEPS):  
                
                if episode_final is True:  
                    # frames.append(self.env.render(mode='rgb_array'))
                    frames.append(self.env.render())   # 마지막 에피소드면 렌더링
                    
                action = self.agent.get_action(state, episode)    # agent가 action 선택

                """
                observation_next, reward, done, _ = self.env.step(
                    action.item()) 
                """
                # action 수행 후 결과 저장
                step_result = self.env.step(action.item())
                observation_next = step_result[0]
                reward = step_result[1]
                done = step_result[2]
                
                if done:    # 에피소드 종료
                    state_next = None 

                    episode_10_list = np.hstack(
                        (episode_10_list[1:], step + 1))  # 가장 최근 10개 에피소드의 step 기록

                    if step < 100:   # step이 100미만
                        reward = torch.FloatTensor(
                            [-1.0])   # reward -1
                        complete_episodes = 0    # 연속 성공 에피소드 초기화
                    else:
                        reward = torch.FloatTensor([1.0])     # reward +1
                        complete_episodes = complete_episodes + 1 
                        print(complete_episodes)# 연속 성공 에피소드 증가
# 연속 성공 에피소드 증가
                else:
                    reward = torch.FloatTensor([0.0])   # 중간 reward 0 부여
                    state_next = observation_next  
                    state_next = torch.from_numpy(state_next).type(
                        torch.FloatTensor)  
                    state_next = torch.unsqueeze(state_next, 0)  
                    

                self.agent.memorize(state, action, state_next, reward)   # 메모리에 transition 저장
                self.agent.update_q_function()    # 신경망 업데이트

                state = state_next

                if done:
                    print('%d Episode: Finished after %d steps：최근 10 에피소드의 평균 단계 수 = %.1lf' % (
                        episode, step + 1, episode_10_list.mean()))
                    
                    if(episode % 2 == 0):
                        self.agent.update_target_q_function()   # 2 에피소드마다 타겟 신경망 업데이트
                    break
                    
                    
            if episode_final is True:
                display_frames_as_gif(frames)  # 애니메이션 저장, 표시
                break

            if complete_episodes >= 15:
                print('10 에피소드 연속 성공')
                episode_final = True  

## 6. Cartpole DDQN 학습

In [35]:
cartpole_env = Environment()
cartpole_env.run()

  logger.warn(
  if not isinstance(terminated, (bool, np.bool8)):
  a_m[non_final_mask] = self.main_q_network(
  a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
  next_state_values[non_final_mask] = self.target_q_network(


Net(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=2, bias=True)
)
0 Episode: Finished after 28 steps：최근 10 에피소드의 평균 단계 수 = 2.8
1 Episode: Finished after 27 steps：최근 10 에피소드의 평균 단계 수 = 5.5
2 Episode: Finished after 18 steps：최근 10 에피소드의 평균 단계 수 = 7.3
3 Episode: Finished after 16 steps：최근 10 에피소드의 평균 단계 수 = 8.9
4 Episode: Finished after 20 steps：최근 10 에피소드의 평균 단계 수 = 10.9
5 Episode: Finished after 22 steps：최근 10 에피소드의 평균 단계 수 = 13.1
6 Episode: Finished after 30 steps：최근 10 에피소드의 평균 단계 수 = 16.1
7 Episode: Finished after 45 steps：최근 10 에피소드의 평균 단계 수 = 20.6
8 Episode: Finished after 10 steps：최근 10 에피소드의 평균 단계 수 = 21.6
9 Episode: Finished after 33 steps：최근 10 에피소드의 평균 단계 수 = 24.9
10 Episode: Finished after 40 steps：최근 10 에피소드의 평균 단계 수 = 26.1
11 Episode: Finished after 22 steps：최근 10 에피소드의 평균 단계 수 = 25.6
12 Episode: Finished after 19 steps：최근 10 에피소드의 평균 단계 수 = 25.7
13 Episode

MovieWriter ffmpeg unavailable; using Pillow instead.


TypeError: setup() got an unexpected keyword argument 'clear_temp'