In [1]:
import torch
from torch import nn
from torchvision import transforms as T

import time, datetime
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os, copy

import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

# 작업 디렉토리 설정
import os
os.chdir('/kaggle/input/gym-super-mario-bros')

# NES 에뮬레이터 설치
!pip --disable-pip-version-check install -q nes_py
from nes_py.wrappers import JoypadSpace

import gym_super_mario_bros

In [2]:
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0")

# 액션 스페이스 한정
# 0. 오른쪽으로 걷기
# 1. 오른쪽으로 점프
env = JoypadSpace(env, [["right"], ["right", "A"]])

env.reset()
next_state, reward, done, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")

(240, 256, 3),
 0.0,
 False,
 {'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'y_pos': 79}


In [3]:
# 모든 프레임에서 액션 수행하지 않고 일정 프레임 스킵해서 효율 상승
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self._skip = skip
        
    def step(self, action):
        total_reward = 0.0
        done = False
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info

In [4]:
class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
        self.transform = T.Grayscale()
    
    def permute_orientation(self, observation):
        observation = np.transpose(observation, (2, 0, 1))
        return torch.tensor(observation.copy(), dtype=torch.float)
    
    def observation(self, observation):
        observation = self.permute_orientation(observation)
        return self.transform(observation)

In [5]:
class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int): self.shape = (shape, shape)
        else: self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
        
    def observation(self, observation):
        transforms = T.Compose([T.Resize(self.shape), T.Normalize(0, 255)])
        return transforms(observation).squeeze(0)

In [6]:
env = SkipFrame(env, skip=4)  # 4프레임 단위로 수행
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
env = FrameStack(env, num_stack=4)  # num_stack 만큼의 최근 프레임을 쌓아서 하나의 상태로 만듦

# DDQN

In [7]:
# DQN 모델 생성
class MarioNet(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        c, w, h = input_dim
            
        self.online = nn.Sequential(
            nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim)
        )
        
        self.target = copy.deepcopy(self.online)
        
        # target인 경우 학습 안함
        for i in self.target.parameters():
            i.requires_grad = False
            
    def forward(self, input, model):
        if model == "online":
            return self.online(input)
        elif model == "target":
            return self.target(input)

# Agent

In [8]:
class Mario:
    def __init__(self, state_dim, action_dim, use_cuda):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.use_cuda = use_cuda
        self.memory = deque(maxlen=100000)
        self.batch_size = 32
        
        self.net = MarioNet(self.state_dim, self.action_dim).float()
        if self.use_cuda:
            self.net = self.net.to(device="cuda")
        
        self.exploration_rate = 1
        self.exploration_rate_decay = 0.99999975
        self.exploration_rate_min = 0.1
        self.curr_step = 0

        self.save_every = 5e5
    
    
    def act(self, state):
        # 탐험률이 높으면 무작위 행동
        if np.random.rand() < self.exploration_rate:
            action_idx = np.random.randint(self.action_dim)
        
        # 아닌 경우 현재 state를 기준으로 신경망을 통해 행동
        else:
            state = state.__array__()
            
            if self.use_cuda: state = torch.tensor(state).cuda()
            else: state = torch.tensor(state)
                
            state = state.unsqueeze(0)
            action_values = self.net(state, model="online")
            action_idx = torch.argmax(action_values, axis=1).item()
            
        self.exploration_rate *= self.exploration_rate_decay
        self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
        
        self.curr_step += 1
        return action_idx
    
    
    # 저장 및 리콜 메서드
    def cache(self, state, next_state, action, reward, done):
        state = state.__array__()
        next_state = next_state.__array__()
        
        state = torch.tensor(state)
        next_state = torch.tensor(next_state)
        action = torch.tensor([action])
        reward = torch.tensor([reward])
        done = torch.tensor([done])
        
        if self.use_cuda:
            state = state.cuda()
            next_state = next_state.cuda()
            action = action.cuda()
            reward = reward.cuda()
            done = done.cuda()
            
        self.memory.append((state, next_state, action, reward, done,))
        
        
    def recall(self):
        batch = random.sample(self.memory, self.batch_size)
        state, next_state, action, reward, done = map(torch.stack, zip(*batch))
        return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()

# TD Estimate & TD Target

In [9]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, use_cuda):
        super().__init__(state_dim, action_dim, use_cuda)
        self.gamma = 0.9
        self.burnin = 1e4
        self.learn_every = 3 
        self.sync_every = 1e4
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
        self.loss_fn = torch.nn.SmoothL1Loss()

    # 현재 상태와 행동에 대한 Q값 추정
    def td_estimate(self, state, action):
        current_Q = self.net(state, model="online")[
            np.arange(0, self.batch_size), action
        ]
        return current_Q

    
    # 목표 값 계산
    @torch.no_grad()
    def td_target(self, reward, next_state, done):
        next_state_Q = self.net(next_state, model="online")
        best_action = torch.argmax(next_state_Q, axis=1)
        next_Q = self.net(next_state, model="target")[
            np.arange(0, self.batch_size), best_action
        ]
        return (reward + (1 - done.float()) * self.gamma * next_Q).float()
    
    
    def update_Q_online(self, td_estimate, td_target):
        loss = self.loss_fn(td_estimate, td_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()

    
    def sync_Q_target(self):
        self.net.target.load_state_dict(self.net.online.state_dict())
        
    
    def learn(self):
        if self.curr_step % self.sync_every == 0: self.sync_Q_target()
        if self.curr_step % self.save_every == 0: self.save()
        if self.curr_step < self.burnin: return None, None
        if self.curr_step % self.learn_every != 0: return None, None

        state, next_state, action, reward, done = self.recall()
        # TD 추정값 계산
        td_est = self.td_estimate(state, action)
        # TD 목표값 계산
        td_tgt = self.td_target(reward, next_state, done)
        # 모델 업데이트
        loss = self.update_Q_online(td_est, td_tgt)

        return (td_est.mean().item(), loss)

In [10]:
class MetricLogger:
    def __init__(self):
        self.ep_rewards = []
        self.ep_lengths = []
        self.ep_avg_losses = []
        self.ep_avg_qs = []

        self.moving_avg_ep_rewards = []
        self.moving_avg_ep_lengths = []
        self.moving_avg_ep_avg_losses = []
        self.moving_avg_ep_avg_qs = []

        self.init_episode()

        self.record_time = time.time()

    def log_step(self, reward, loss, q):
        self.curr_ep_reward += reward
        self.curr_ep_length += 1
        if loss:
            self.curr_ep_loss += loss
            self.curr_ep_q += q
            self.curr_ep_loss_length += 1

    def log_episode(self):
        self.ep_rewards.append(self.curr_ep_reward)
        self.ep_lengths.append(self.curr_ep_length)
        if self.curr_ep_loss_length == 0:
            ep_avg_loss = 0
            ep_avg_q = 0
        else:
            ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
            ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
        self.ep_avg_losses.append(ep_avg_loss)
        self.ep_avg_qs.append(ep_avg_q)

        self.init_episode()

    def init_episode(self):
        self.curr_ep_reward = 0.0
        self.curr_ep_length = 0
        self.curr_ep_loss = 0.0
        self.curr_ep_q = 0.0
        self.curr_ep_loss_length = 0

    def record(self, episode, epsilon, step):
        mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
        mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
        mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
        mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)
        self.moving_avg_ep_rewards.append(mean_ep_reward)
        self.moving_avg_ep_lengths.append(mean_ep_length)
        self.moving_avg_ep_avg_losses.append(mean_ep_loss)
        self.moving_avg_ep_avg_qs.append(mean_ep_q)

        last_record_time = self.record_time
        self.record_time = time.time()
        time_since_last_record = np.round(self.record_time - last_record_time, 3)

        print(
            "Episode:{:4d}  :: Step:{:5d}  :: Epsilon:{:8.3f}  :: Mean_Reward:{:8.3f}  :: " \
            "Mean_Length:{:8.3f}  :: Mean_Loss:{:4.3f}  :: Mean_Q_Value:{:8.3f}  :: " \
            "Time_Delta:{:8.3f} "
            .format(episode, step, epsilon, mean_ep_reward, mean_ep_length, 
                    mean_ep_loss, mean_ep_q, time_since_last_record)
        )

# 학습

In [11]:
use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")
print()

mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, use_cuda=use_cuda)

logger = MetricLogger()

episodes = 101
for e in range(episodes):

    state = env.reset()
    while True:

        action = mario.act(state)
        next_state, reward, done, info = env.step(action)
        mario.cache(state, next_state, action, reward, done)
        q, loss = mario.learn()
        logger.log_step(reward, loss, q)
        state = next_state

        if done or info["flag_get"]:
            break

    logger.log_episode()
    
    # 10개 에피소드마다 확인
    if e % 10 == 0:
        logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)

Using CUDA: True

Episode:   0  :: Step:  204  :: Epsilon:   1.000  :: Mean_Reward: 762.000  :: Mean_Length: 204.000  :: Mean_Loss:0.000  :: Mean_Q_Value:   0.000  :: Time_Delta:   2.404 
Episode:  10  :: Step: 2262  :: Epsilon:   0.999  :: Mean_Reward: 648.636  :: Mean_Length: 205.636  :: Mean_Loss:0.000  :: Mean_Q_Value:   0.000  :: Time_Delta:  25.440 
Episode:  20  :: Step: 4081  :: Epsilon:   0.999  :: Mean_Reward: 713.619  :: Mean_Length: 194.333  :: Mean_Loss:0.000  :: Mean_Q_Value:   0.000  :: Time_Delta:  21.926 
Episode:  30  :: Step: 6178  :: Epsilon:   0.998  :: Mean_Reward: 700.645  :: Mean_Length: 199.290  :: Mean_Loss:0.000  :: Mean_Q_Value:   0.000  :: Time_Delta:  25.525 
Episode:  40  :: Step: 7950  :: Epsilon:   0.998  :: Mean_Reward: 681.878  :: Mean_Length: 193.902  :: Mean_Loss:0.000  :: Mean_Q_Value:   0.000  :: Time_Delta:  20.835 
Episode:  50  :: Step:10216  :: Epsilon:   0.997  :: Mean_Reward: 691.667  :: Mean_Length: 200.314  :: Mean_Loss:0.056  :: Mean_Q_Va

# 시각화

In [12]:
!pip --disable-pip-version-check install -q pyvirtualdisplay

from pyvirtualdisplay import Display

from IPython import display as ipythondisplay
from IPython.display import HTML

from gym.wrappers import Monitor
from glob import glob

import base64
import io

display = Display(visible=0, size=(600, 300))
display.start()

<pyvirtualdisplay.display.Display at 0x79e7dfe99190>

In [13]:
def show_video():
    mp4list = glob('/kaggle/working/video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{0}" type="video/mp4" />
                 </video>'''.format(encoded.decode('ascii'))))
    else: 
        print("Could not find video")
    

def wrap_env(env):
    env = Monitor(env, '/kaggle/working/video', force=True)
    return env

env = wrap_env(env)

In [14]:
state = env.reset()

while True:
    action = mario.act(state)
    next_state, reward, done, info = env.step(action)
    state = next_state
    
    if done or info["flag_get"]:
        break

env.close()
show_video()