In [13]:
from base64 import b64encode
from glob import glob
from IPython.display import HTML
from IPython import display as ipy_display
from gym import logger as gym_logger
from gym.wrappers.record_video import RecordVideo

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from collections import deque
import numpy as np
import random
import gym
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd
import os
import warnings
warnings.filterwarnings(action='ignore')
#### show video func
def show_video(mode='train', filename=None):
    mp4_list = glob(mode+'/*.mp4')
    # print(mp4_list)
    if mp4_list:
        if filename :
            file_lists = glob(mode+'/'+filename)
            if not file_lists:
                print('No {} found'.format(filename))
                return -1
            mp4 = file_lists[0]
                    
        else:
            mp4 = sorted(mp4_list)[-1]

        print(mp4)
        video = open(mp4, 'r+b').read()
        encoded = b64encode(video)
        ipy_display.display(HTML(data='''
            <video alt="gameplay" autoplay controls style="height: 400px;">
                <source src="data:video/mp4;base64,%s" type="video/mp4" />
            </video>
        ''' % (encoded.decode('ascii'))))
    else:
        print('No video found')
        return -1
#### early stopping by avg
class EarlyStopping_by_avg():
    def __init__(self, patience=10, verbose=0):
        super().__init__()

        self.best_avg = 0
        self.step = 0
        self.patience = patience
        self.verbose = verbose

    def check(self, avg , avg_scores):
        ## best avg가 나올경우
        if avg >= self.best_avg:
            self.best_avg = avg
            self.step = 0
            # print("avg_reset")
        ## 이전값보다 현재 avg가 높을경우
        elif len(avg_scores) > 1 and avg > avg_scores[-2]:  ### 이전 값과 비교해야하므로 -2  , -1은 지금 avg와 동일함
            self.step = 0
            # print("이전값보다 avg 높아서 reset")
        else:
            self.step += 1
            if self.step > self.patience:
                if self.verbose:
                    print('조기 종료')
                return True
        return False
## dqn
class DQN():
    def __init__(self, state_size, action_size):
        # 상태 및 행동 크기 정의
        self.state_size = state_size
        self.action_size = action_size

        # 감쇠율 및 엡실론 정의
        # Greedy in the limit of infinite exploration (GLIE) 정책 고려
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_decay = 0.999
        self.epsilon_min = 0.01

        # 리플레이 버퍼 크기 및 학습 시작 크기 정의
        self.buffer_size = 2000
        self.buffer_size_train_start = 200

        # 리플레이 버퍼 정의
        self.buffer = deque(maxlen=self.buffer_size)

        # 인공신경망 학습 하이퍼파라미터 설정
        self.loss_fn = MeanSquaredError()
        self.learning_rate = 0.001
        self.optimizer = Adam(learning_rate = self.learning_rate)
        self.batch_size = 32
        
        # Q-네트워크 및 타겟(Q)-네트워크 정의
        self.q_network = self.get_network()
        self.target_q_network = self.get_network()

        # 타겟(Q)-네트워크 파라미터 복제 함수 정의
        self.update_target_network()
        
        ### check point 생성######
        self.dir_name = os.getcwd()
        self.folder_checkpoint = os.path.join(self.dir_name,'checkpoint')
        self.checkpoint = tf.train.Checkpoint(model=self.q_network , optimizer=self.optimizer)
        self.manager = tf.train.CheckpointManager(self.checkpoint, self.folder_checkpoint, max_to_keep=40)
        

    def update_target_network(self):
        weights = self.q_network.get_weights()
        self.target_q_network.set_weights(weights)

    def get_network(self):
        network = Sequential()
        network.add(Dense(24, activation='relu', input_shape=(self.state_size,)))    # (None, 4) = (4,)
        network.add(Dense(24, activation='relu'))
        network.add(Dense(12, activation='relu'))
        network.add(Dense(self.action_size))

        return network

    def remember(self, state, action, reward, next_state, done):
        # 입력받은 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적하는 함수 구현
        item = (state, action, reward, next_state, done)
        self.buffer.append(item)
    
    def policy(self, state):
        # 입력받은 상태에 대하여 행동을 결정하는 함수 구현
        # 엡실론-그리디 알고리즘 구현

        if np.random.uniform(0,1) < self.epsilon:
            # random
            action = np.random.choice([0,1])
            # action = np.random.choice([x for x in range(self.action_size)])
        else:
            # greedy
            # self.q_network.predict(state)
            out = self.q_network(state)
            # out = [Q[0], Q[1]]
            action = np.argmax(out)

        return action


    def train(self):
        # GLIE 구현
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

        # 리플레이 버퍼에서 배치사이즈만큼 랜덤하게 추출하여 mini_batch 구현
        # random.sample 함수 활용
        mini_batch = random.sample(self.buffer, self.batch_size)

        # mini_batch에서 각 아래 정보로 분리하기
        states, actions, rewards, next_states, dones = zip(*mini_batch)

        # 분리된 정보를 tensor 형태로 변환
        states = tf.convert_to_tensor(states)
        actions = tf.convert_to_tensor(actions)
        rewards = tf.convert_to_tensor(rewards)
        next_states = tf.convert_to_tensor(next_states)
        # dones를 True False로 바꿀 껀데 tf.float32 실수 형태로 바꿔 주는코드 (1.0 , 0.0)
        dones = tf.convert_to_tensor(dones, dtype=tf.float32)

        # 학습 Target 정의
        # r + gamma * max_q_target(next_s)
        # mini_batch 단위로 elements-wise하게 계산되어야 합니다.
        # max_q_target을 위해서 행렬 내 최대값을 반환해주는 np.amax 함수 활용 
        # np.amax([[0.1, 0.9], [0.7, 0.3]], axis=-1))
        # 에피소드의 마지막 상태의 경우, only 보상이 target 값이 된다.

        ## 미래에 대한 target을 정하는 부분 --> 미래에 받을 가치 와 현재가치를 갖게 하면 현재의 가치를 최적의 상태로 만들 수 있음
        q_next = self.target_q_network(next_states)
        # print("q_next", q_next)

        ## 축별로 가장 높은값을 가져옴
        max_q_next =  np.amax(q_next, axis=-1)
        # print("max_q_next", max_q_next)

        ## 끝나지 않았을 경우 (done = False)
        ## 끝났을 경우 (done = True) 의 경우 self.gamma * max_q_next 값을 포함하지 않아야함.
        targets = rewards + ( self.gamma * max_q_next ) * ( 1-dones )

        # print('targets',targets)

        # q_network 학습
        # tf.GradientTape() 함수를 활용하여 자동미분 후 학습에 활용
        # (참조: https://teddylee777.github.io/tensorflow/gradient-tape)
        with tf.GradientTape() as tape:
        
            q = self.q_network(states)
            # print(actions)
            one_hot_a = tf.one_hot( actions, self.action_size )

            ## one hot encindg 값과 q값을 곱해준다.
            q_sa = tf.reduce_sum(q * one_hot_a, axis=1)
            # print(q_sa)
            
            # 오차 계산
            loss = self.loss_fn(targets, q_sa)

        # 손실함수를 통해 계산한 오차를 네트워크 가중치로 미분!
        ## 여기서 q_network 만 학습을 진행함
        grads = tape.gradient(loss, self.q_network.trainable_weights)
        # 미분값을 기준으로 각 네트워크 가중치를 업데이트!
        self.optimizer.apply_gradients(zip(grads, self.q_network.trainable_weights))
        
        ### check point 저장하기
        save_path = self.manager.save()
        # print("Saved checkpoint {}".format(save_path))

In [14]:
# CartPole 환경 정의
ENV_NAME = 'CartPole-v1'
# env = gym.make(ENV_NAME, render_mode="rgb_array")
env = gym.make(ENV_NAME)

# 비디오 레코딩
env = RecordVideo(env, './train', episode_trigger =lambda episode_number: True )
env.metadata = {'render.modes': ['human', 'ansi']}

# CartPole 환경의 상태와 행동 크기 정의
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# 위에서 정의한 DQN 클래스를 활용하여 agent 정의
agent = DQN(state_size, action_size)

scores, avg_scores, episodes, losses = [], [], [], []

# 반복 학습 에피소드 수 정의
num_episode = 100
early_stopping_by_avg = EarlyStopping_by_avg(patience=10, verbose=1)
## early stopping을 위한 초기값 설정 
avg_step = 0

for epoch in range(num_episode):
    # done flag와 score 값 초기화
    done = False
    score = 0


    # 환경 reset을 통해 초기 상태 정의
    state = env.reset()
    
    # print(f"avg: {avg_step}")
    if early_stopping_by_avg.check(avg_step , avg_scores ):
        print("earstpping 실행")
        break

    while not done:

        # 현재 상태에 대하여 행동 정의
        action = agent.policy(state[np.newaxis,:])

        # env.step 함수를 이용하여 행동에 대한 다음 상태, 보상, done flag 등 획득
        next_state, reward, done, info = env.step(action)
        
        # 해당 에피소드의 최종 score를 위해 reward 값 누적
        score += reward

        # 기본 환경은 pole이 쓰러지지 않으면 +1의 보상을 준다.
        # 자신만의 보상가설을 만들어 학습 가능
        ### 종료조건
        # 폴 각도는 ±12° 이상입니다.
        # 카트 위치가 ±2.4 이상(카트 중앙이 디스플레이 가장자리에 도달함)
        # 에피소드 길이가 200보다 큽니다.
        # TODO #
        
        def get_reward(pos, angle , done):
            ### 위치 / 속도 조건으로 보상크게
            cond_pos = (pos < 2.0) and (pos > -2.0)
            cond_angle = (angle < 5.0) and (angle > -5.0)
            ### 실패시 보상 -1
            if done:
                return -100.0
            ### 상점
            elif cond_pos or cond_angle:
                return 0.1
            elif cond_pos:
                return 0.3
            elif cond_pos and cond_angle:
                return 0.5
            ### 벌점
            elif (pos > 2.5) or (pos < -2.5):
                return -20
            elif (angle > 10.0) or (angle < -10.0):
                return -10
            
            
        ### position
        pos = next_state[0]
        ### velocity
        angle = next_state[2]
        reward = get_reward(pos, angle, done)
        # print(f'reward : {reward : .3f}')
        # reward=0.1 if not done else -1


        # 획득된 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적
        agent.remember(state, action, reward, next_state, done)

        # 다음 상태를 현재 상태로 정의
        state = next_state

        # buffer 크기가 일정 기준 이상 쌓이면 학습 진행
        # TODO #
        if len(agent.buffer) >= agent.buffer_size_train_start :
            agent.train()

        if done:
            
            ### early stop
            avg_step = np.mean(scores[-10:])


            # 에피소드가 종료되면 target_q_network 파라미터 복제
            # TODO #
            agent.update_target_network()

            # 에피소드 종료마다 결과 그래프 저장
            scores.append(score)
            avg_scores.append(avg_step)
            episodes.append(epoch)

            
            # 에피소드 종료마다 결과 출력
            print(f'episode: {epoch:3d} | avg_score: { avg_step :3.2f} | buffer_size: {len(agent.buffer):4d} | epsilon: {agent.epsilon:.4f}')
            

env.close()

plt.title('Test graph')
plt.xlabel('episodes')

plt.plot(episodes, avg_scores,
         color='skyblue',
         marker='o', markerfacecolor='blue',
         markersize=6)
plt.ylabel('avg_scores', color='blue')
plt.tick_params(axis='y', labelcolor='blue')

plt.savefig('cartpole_graph.png')
plt.show()

TypeError: tuple indices must be integers or slices, not tuple