In [None]:
'''
kaggle - https://www.kaggle.com/code/leejin11/dqn-code
'''

In [None]:
# DQN (SARS => s, a, r, s')
# Q(s, a) <- Q(s, a) + α(r + γmax(Q(s’, a’)) - Q(s, a))
#                       (           td           )
# off police
# replay_memory를 사용

In [None]:
import numpy as np
import random
from collections import deque

import torch, torch.nn as nn
import gym

import matplotlib.pyplot as plt
import os, sys

In [None]:
## 시각화

In [None]:
def make_plot(scores, episodes):
    plt.figure(figsize=(10, 5))
    plt.plot(episodes, scores, label='Score per Episode')
    plt.xlabel('Episode')
    plt.ylabel('Score')
    plt.title('DQN: CartPole-v1 Performance')
    plt.tight_layout()
    plt.savefig(f'DQN_plot.jpg')
    plt.close()

In [None]:
## class

In [None]:
class DQN(torch.nn.Module):
    '''DQN network
        Args:
            state_size (int): state size
            action_size (int): action size
            c_mid (int): hidden layer size
    '''
    def __init__(self, state_size, action_size, c_mid):
        super(DQN, self).__init__()
        self.dqn_net = nn.Sequential(
            # Input [state_size]
            nn.Linear(state_size, c_mid), # [c_mid]
            nn.ReLU(),
            nn.Linear(c_mid, action_size), #[action_size]
        )

    def forward(self, x):
        # Input [state_size]
        x = self.dqn_net(x) # [action_size]     
        return x

class DQNAgent:
    '''DQN에서 활용할 수 있는 함수들의 class
        Args:
            state_size (int): state size
            action_size (int): action size
            c_mid (int): hidden layer size
    '''
    def __init__(self, state_size, action_size, c_mid=32, device=device):
        self.state_size = state_size
        self.action_size = action_size

        # params
        self.discount_factor = 0.99
        self.learning_rate = 0.001
        self.epsilon = 1.0
        self.epsilon_decay = 0.999
        self.epsilon_min = 0.01

        # replay memory에 관련된 parameters
        self.batch_size = 64
        self.train_start = 1000
        self.train_step = 0
        self.train_freq = 10
        self.memory = deque(maxlen=3000)

        # model과 target_model을 분리하여 학습을 진행함
        self.model = DQN(state_size, action_size, c_mid).to(device)
        self.target_model = DQN(state_size, action_size, c_mid).to(device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)

        self.update_target_model()
        self.loss = nn.MSELoss()

    # traget_model을 model의 가중치로 업데이트
    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    # state를 기준으로 next state를 예측
    def get_action(self, state):
        # exploation
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            state = torch.FloatTensor(state).unsqueeze(0).to(device)
            with torch.no_grad():
                return self.model(state).argmax().item()

    # replay memory 업데트트
    def append_sample(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def update(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        # 일정량 이상 경험하지 않았으면 학습 진행 x
        if self.train_start > len(self.memory):
            return

        # batch_size만큼 replay memory에서 가져기기
        mini_batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*mini_batch))
        
        states = torch.FloatTensor(states).to(device)
        actions = torch.LongTensor(actions).to(device)
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        dones = torch.FloatTensor(dones).to(device)

        q = self.model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q = self.target_model(next_states).max(1)[0]
        target = rewards + self.discount_factor * (1 - dones) * next_q

        loss = self.loss(q, target.detach())

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.train_step += 1
        if self.train_step > self.train_freq:
            self.update_target_model()
            self.train_step = 0

In [None]:
## train

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

env = gym.make('CartPole-v1')

state_size = env.observation_space.shape[0] # 4 <= [position, velocity, angle, angular_velocity]
action_size = env.action_space.n # 2 <= [left, right]

agent = DQNAgent(state_size,action_size)
scores, episodes = [], []

score_line = {'line':400, 'count':0}
EPISODE = 500
for episode in range(1, EPISODE + 1):
    
    state = env.reset()
    score = 0
    
    while True:
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)
        
        score += reward

        # replay memroy에 가가
        agent.append_sample(state, action, reward, next_state, done)
        state = next_state
        
        agent.update()
        if done:
            scores.append(score)
            episodes.append(episode)

            make_plot(scores, episodes)
            
            if episode % 10 == 0:
                print(f'episode : {episode:3d}\t score : {score:3.1f}\t epsilon : {agent.epsilon:.3f}')

            if episode % 100 == 0:
                torch.save(agent.model.state_dict(), f'DQN_{episode}.pth')
            
            if score >= score_line['line']:
                score_line['count'] += 1
                # 일정 횟수 이상 저장할 경우 탈출
                if score_line['count'] > 5:
                    print(f'episode : {episode:3d}\t score : {score:3.1f}\t epsilon : {agent.epsilon:.3f}')
                    torch.save(agent.model.state_dict(), f'DQN_{episode}.pth')
                    model_path = f'./DQN_{episode}.pth'
                    sys.exit()
            else:
                score_line['count'] = 0
            break

In [None]:
## test ##

In [None]:
class DQNAgent:
    def __init__(self, state_size, action_size, c_mid=32):
        
        self.state_size = state_size
        self.action_size = action_size

        
        self.model = DQN(state_size, action_size, c_mid)
        self.model.load_state_dict(torch.load(model_path))
        self.model.eval()
    
    def get_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            return self.model(state).argmax().item()

In [None]:
from gym.wrappers import RecordVideo

env = gym.make("CartPole-v1", render_mode="rgb_array")
env = RecordVideo(env, video_folder="./test_video", episode_trigger=lambda e: True)

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

model = DQNAgent(state_size, action_size)

## 

scores = []
EPISODE = 5
for episode in range(1, EPISODE+1):
    
    state = env.reset()
    score = 0

    while True:
        action = agent.get_action(state)            
        next_state, reward, done, _ = env.step(action)
    
        score += reward
        state = next_state
        
        if done:
            scores.append(score)
            print(f'episode {episode} score : {score}')
            break
            
print(f'max score : {max(scores)}')