In [1]:
import time

import gym  # 강화학습 환경을 제공해 주는 gym 라이브러리
import random
import numpy as np
from collections import deque
from keras.layers import Dense
from keras.optimizers import Adam
from keras.models import Sequential

EPISODES = 50  # 게임 진행 회수

In [2]:
class DQNAgent:
    def __init__(self, state_size, action_size):

        self.state_size = state_size
        self.action_size = action_size

        self.gamma = 0.99  # 감가율
        self.learning_rate = 0.001  # 학습률
        self.epsilon = 1.0
        self.epsilon_decay = 0.999
        self.epsilon_min = 0.01
        self.batch_size = 64
        self.train_threshold = 1000

        self.memory = deque(maxlen=2000)  # DQN 알고리즘에서 사용할 리플레이 메모리

        self.model = self.build_model()  # 정책 신경망 생성
        self.target_model = self.build_model()  # 타깃 신경망 생성

        self.update_target_model()  # 모델 업데이트

    def build_model(self):
        """정책 모델 생성"""
        model = Sequential()
        model.add(Dense(256, input_shape=(self.state_size,), activation='relu'))
        model.add(Dense(256, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def update_target_model(self):
        """타깃 모델 업데이트"""
        self.target_model.set_weights(self.model.get_weights())

    def act(self, state):
        """e-greedy 정책으로 행동하는 함수 구현"""
        if np.random.rand() <= self.epsilon:  # 랜덤으로 생성한 숫자가 엡실론보다 작으면
            return random.randrange(self.action_size)  # 랜덤으로 행동
        else:
            q_value = self.model.predict(state)  # 그렇지 않으면 Q 네트워크를 통해 행동
            return np.argmax(q_value[0])

    def remember(self, state, action, reward, next_state, done):
        """에이전트가 탐험과정에서 얻은 상태, 보상 등을 메모리에 저장"""
        sample = (state, action, reward, next_state, done)
        self.memory.append(sample)

    def train_model(self):
        """정책 신경망 훈련"""
        if self.epsilon > self.epsilon_min:  # 엡실론이 정해진 최솟값에 도달하지 않았을 경우에
            self.epsilon *= self.epsilon_decay  # 감가율을 사용해 엡실론 값을 줄임
        
        # 리플레이 메모리에서 배치 사이즈만큼의 샘플을 무작위로 추출
        mini_batch = random.sample(self.memory, self.batch_size)  
        
        states = np.zeros((self.batch_size, self.state_size))
        next_states = np.zeros((self.batch_size, self.state_size))
        actions, rewards, dones = [], [], []

        # 샘플에서 상태, 행동, 보상 등을 추출해 배열에 저장
        for i in range(self.batch_size):
            states[i] = mini_batch[i][0]
            actions.append(mini_batch[i][1])
            rewards.append(mini_batch[i][2])
            next_states[i] = mini_batch[i][3]
            dones.append(mini_batch[i][4])

        target = self.model.predict(states)  # 현재 상태의 Q 함수
        y = self.target_model.predict(next_states)  # 다음 상태의 타깃 모델의 Q 함수

        for i in range(self.batch_size):
            if dones[i]:
                target[i][actions[i]] = rewards[i]
            else:
                target[i][actions[i]] = rewards[i] + self.gamma * (np.amax(y[i]))

        self.model.fit(states, target, batch_size=self.batch_size, epochs=1, verbose=0)

In [3]:
if __name__ == "__main__":

    env = gym.make('CartPole-v1')  # gym 라이브러리를 통해 CartPole 환경 생성
    state_size = env.observation_space.shape[0]  # CartPole 게임의 상태의 크기
    action_size = env.action_space.n  # CartPole 게임의 가능한 행동 개수

    agent = DQNAgent(state_size, action_size)  # DQN 에이전트 생성

    scores, episodes = [], []

    for e in range(EPISODES):
        done = False
        score = 0
        state = env.reset()  # 환경 초기화
        state = np.reshape(state, [1, state_size])
        tic = time.time()
        while not done:
            env.render()

            action = agent.act(state)  # 상태 state에서 e-greedy 정책으로 행동 선택
            
            # 선택한 행동을 진행해 다음 상태, 보상 등을 얻음
            next_state, reward, done, info = env.step(action)  
            next_state = np.reshape(next_state, [1, state_size])
            
            # 한 에피소드가 게임 중에 끝날 시 -10을 보상 값으로 함
            reward = reward if not done else -10  

            # 샘플을 리플레이 메모리에 저장
            agent.remember(state, action, reward, next_state, done)  

            # 메모리의 샘플이 정의한 threshold보다 많으면 모델 훈련
            if len(agent.memory) >= agent.train_threshold:  
                agent.train_model()

            score += reward
            state = next_state  # 다음 상태의 값을 현제 상태의 값으로 변경

            if done:  # 한 에피소드가 끝날 때마다 출력
                toc = time.time()
                agent.update_target_model()  # Q 함수 업데이트

                print("Episode: {}/{}, Score: {}, time: {:.2f}s".format(
                    e, EPISODES, score, toc-tic))
    env.close()

Episode: 0/50, Score: 1.0, time: 0.59s
Episode: 1/50, Score: 18.0, time: 0.51s
Episode: 2/50, Score: 2.0, time: 0.21s
Episode: 3/50, Score: 6.0, time: 0.28s
Episode: 4/50, Score: 12.0, time: 0.39s
Episode: 5/50, Score: 0.0, time: 0.18s
Episode: 6/50, Score: 12.0, time: 0.38s
Episode: 7/50, Score: 41.0, time: 0.88s
Episode: 8/50, Score: 45.0, time: 0.96s
Episode: 9/50, Score: -1.0, time: 0.17s
Episode: 10/50, Score: 0.0, time: 0.18s
Episode: 11/50, Score: 14.0, time: 0.44s
Episode: 12/50, Score: 23.0, time: 0.59s
Episode: 13/50, Score: 21.0, time: 0.54s
Episode: 14/50, Score: 21.0, time: 0.52s
Episode: 15/50, Score: 2.0, time: 0.21s
Episode: 16/50, Score: 16.0, time: 0.51s
Episode: 17/50, Score: 4.0, time: 0.26s
Episode: 18/50, Score: 6.0, time: 0.28s
Episode: 19/50, Score: -1.0, time: 0.16s
Episode: 20/50, Score: 12.0, time: 0.38s
Episode: 21/50, Score: 8.0, time: 0.31s
Episode: 22/50, Score: 16.0, time: 0.44s
Episode: 23/50, Score: 8.0, time: 0.31s
Episode: 24/50, Score: 15.0, time: 0