In [3]:
import pandas as pd 
import numpy as np
from collections import defaultdict
import random

# 1번

In [4]:
class QLearningAgent:
    def __init__(self, actions):
        # 행동  = [0,1,2,3] 순서대로 상하좌우
        self.actions = actions
        self.learning_rate = 0.01
        self.discount_factor = 0.9
        self.epsilon = 0.9
        self.q_table = defaultdict(lambda: [0.0, 0.0, 0.0, 0.0])
        
    # <s, a, r, s'> 샘플로부터 큐함수 업데이트
    def learn(self, state, action, reward, next_state):
        q_1 = self.q_table[state][action]
        # 벨만 최적 방정식을 사용한 큐함수의 업데이트
        q_2 = reward + self.discount_factor * max(self.q_table[next_state])
        self.q_table[state][action] += self.learning_rate * (q_2 - q_1)

    # 큐함수에 의거하여 입실론 탐욕 정책에 따라서 행동을 반환
    def get_action(self, state):
        if np.random.rand() > self.epsilon:
            # 무작위 행동 반환
            action = np.random.choice(self.actions)
        else:
            # 큐함수에 따른 행동 반환
            state_action = self.q_table[state]
            action = self.arg_max(state_action)
        return action

# 2번 - 코랩으로 한 듯

In [None]:
from tensorflow import keras
from keras import models
from keras import layers
from tensorflow.python.keras.backend import relu, sigmoid
from tensorflow.python.keras.models import Sequential
from ReplayMemory import ReplayMemory
import numpy as np
from ReplayMemory import ReplayMemory

In [None]:
class DQNAgent(object):
    def __init__(self, batch_size=100, gamma=0.9):
        # 학습에 사용할 model과 target_model을 설정한다
        self.model = self._create_model()
        self.target_model = self._create_model()
        # 처음에는 두 모델을 동일 weight로 설정해준다
        self.target_model.set_weights(self.model.get_weights())
        self.replayMemory = ReplayMemory()
        self.gamma = gamma  # 감마, 클수록 미래의 이익을 고려한다
        self.batch_size = batch_size
        self.callbacks = [
            keras.callbacks.TensorBoard(
                log_dir="my_log_dir",
                histogram_freq=1,
                embeddings_freq=1,
            )
        ]

    def _create_model(self) -> Sequential:
        model = models.Sequential()
        model.add(layers.Dense(10, activation=relu, input_shape=(4,)))
        model.add(layers.Dense(10, activation=relu))
        model.add(layers.Dense(2))
        model.compile(optimizer="rmsprop", loss="mse")
        return model

    def forward(self, data):
        return self.model.predict(data)

    # replayMemory를 이용해 Agent를 학습
    def train(self):

        # replayMemory에 저장된 experience의 개수는 2000개 이상이어야 함
        if 2000 > len(self.replayMemory):
            return

        # batch_size만큼 샘플링한다
        # (cur_state, action, reward, done, info, next_state) : list
        samples = self.replayMemory.sample(self.batch_size)
        # batch data를 생성한다

        current_states = np.stack([sample[0] for sample in samples])
        current_q = self.model.predict(current_states)
        next_states = np.stack([sample[5] for sample in samples])
        next_q = self.target_model.predict(next_states)

        for i, (cur_state, action, reward, done, info, next_state) in enumerate(
            samples
        ):
            if done:
                next_q_value = reward
            else:
                next_q_value = reward + self.gamma * np.max(next_q[i])
            current_q[i][action] = next_q_value

        # 학습!!
        self.model.fit(
            x=current_states,
            y=current_q,
            batch_size=self.batch_size,
            verbose=False,
        )

    # target model의 가중치를 model의 가중치로 update 한다
    def _update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def save(
        self,
        path: str,
        model_name: str,
        version: str,
        num_trained: int,
        target_model_name: str = None,
    ):
        """
        모델 저장 이름 예시
        /path/cartpole_v5_300_trained.h5

        """
        save_name = f"{path}/{model_name}_{version}_{num_trained}_trained.h5"
        target_model_name = f"target_{model_name}"
        target_save_name = (
            f"{path}/{target_model_name}_{version}_{num_trained}_trained.h5"
        )
        self.model.save(save_name)
        self.target_model.save(target_save_name)

    def load(
        self,
        path: str,
        model_name: str,
        version: str,
        num_trained: int,
        target_model_name: str = None,
    ):
        save_name = f"{path}/{model_name}_{version}_{num_trained}_trained.h5"
        target_model_name = f"target_{model_name}"
        target_save_name = (
            f"{path}/{target_model_name}_{version}_{num_trained}_trained.h5"
        )
        self.model = keras.models.load_model(save_name)
        self.target_model = keras.models.load_model(target_save_name)

# DQNTrainer

In [None]:
from sys import version
from DQNAgent import DQNAgent
import numpy as np
import gym
from tqdm import tqdm
import os
import matplotlib.pyplot as plt

In [None]:
class DQNTrainer(object):
    def __init__(
        self,
        env: gym.Env,
        max_episode=500,
        step_size=2000,
        epsilon=0.99,
        epsilon_decay=0.995,
        temp_save_freq=100,
        model_path=os.path.join(os.getcwd(), "model"),
        model_name="model",
        version="test",
        min_epsilon=0.01,
        temp_save=True,
        save_on_colab=False,
    ):
        self.agent = DQNAgent()
        self.max_episode = max_episode
        self.env = env
        self.step_size = step_size
        self.epsilon = epsilon
        # 학습할때 임시로 저장할 빈도
        self.temp_save_freq = temp_save_freq
        # 모델을 저장할 경로
        self.model_path = model_path
        # 저장할 모델의 경로
        self.model_name = model_name
        # 저장할 최종 모델의 버전
        self.version = version
        self.target_model_path = "target_" + model_path
        # epsilon greedy
        self.min_epsilon = min_epsilon
        self.epsilon_decay = epsilon_decay
        # 임시 저장 여부
        self.temp_save = temp_save
        # colab에서 저장할지 여부
        self.save_on_colab = save_on_colab
        # episode별로 sum of reward를 저장
        self.save_epi_reward = []
        self.save_epi_step_num = []

    def train(self):
        pbar = tqdm(initial=0, total=self.max_episode, unit="episodes")

        for episode in range(self.max_episode):
            cur_state = self.env.reset()
            step, episode_reward, done = 0, 0, False

            while not done:
                if (np.random.randn(1)) <= self.epsilon:
                    # 0, 1 중에서 무작위로 수를 하나 뽑는다
                    action = np.random.randint(2)
                else:
                    # Q(cur_state,a)중에서 가장 값이 높도록 하는 a를 action으로 고른다
                    output = self.agent.forward(cur_state.reshape(-1, 4))
                    output = np.argmax(output)
                    action = output

                next_state, reward, done, info = self.env.step(action)

                # replayMemory에 결과를 저장한다
                self.agent.replayMemory.add(
                    (cur_state, action, reward, done, info, next_state)
                )

                # replayMemory를 이용해 학습을 진행한다
                self.agent.train()

                # 상태 업데이트
                cur_state = next_state
                episode_reward += reward
                step += 1

                if done:
                    break

            # target_model의 가중치를 model과 동기화
            self.agent._update_target_model()

            # epsilon decay
            self.epsilon = max(self.epsilon * self.epsilon_decay, self.min_epsilon)

            # 설정한 빈도에 따라서 임시 저장
            if self.temp_save == True:
                if (episode % self.temp_save_freq) == 0:
                    if self.save_on_colab:
                        self.colab_save(
                            model_name=self.model_name,
                            version=self.version,
                            num_trained=episode,
                        )
                    else:
                        self.agent.save(
                            path=self.model_path,
                            model_name=self.model_name,
                            version=self.version,
                            num_trained=episode,
                        )

            pbar.update(1)

            self.save_epi_reward.append(episode_reward)
            self.save_epi_step_num.append(step)

            ####### 한 EPISODE 종료 #########

        # --------------모든 에피소드 종료---------------- #

        # 모든 학습이 끝나면 모델을 저장한다
        if self.save_on_colab:
            self.colab_save(
                model_name=self.model_name,
                version=self.version,
                num_trained=self.max_episode,
            )
        else:
            self.agent.save(
                path=self.model_path,
                model_name=self.model_name,
                version=self.version,
                num_trained=self.max_episode,
            )

        # episode에 따른 학습결과 (reward의 총합)을 그래프로 표시한다.
        plt.plot(self.save_epi_reward)

    def colab_save(self, model_name: str, version: str, num_trained: int):
        from google.colab import drive
        import os

        mount_path = "/content/drive"
        drive.mount(mount_path)

        model_path = os.path.join(mount_path, "MyDrive", "model")

        # save model on google drive
        self.agent.save(
            path=model_path,
            model_name=model_name,
            version=version,
            num_trained=num_trained,
        )

    def colab_load(self, model_name: str, version: str, num_trained: int):
        from google.colab import drive
        import os

        mount_path = "/content/drive"
        drive.mount(mount_path)

        model_path = os.path.join(mount_path, "MyDrive", "model")

        # load model on google drive
        self.agent.load(
            path=model_path,
            model_name=model_name,
            version=version,
            num_trained=num_trained,
        )