In [1]:
from collections import deque
import numpy as np
import time
import matplotlib.pyplot as plt

import torch
from torch import nn
from pathlib import Path
import random
import shutil
import pickle

from nes_py.wrappers import JoypadSpace

import gym_super_mario_bros
from gym_super_mario_bros.actions import COMPLEX_MOVEMENT, SIMPLE_MOVEMENT, RIGHT_ONLY

In [2]:
class Logger():
    def __init__(self, save_path='Logs'):
        self.save_path = save_path

        self.rewards = []
        self.ep_lengths = []
        self.ep_losses = []
        self.epsilons = []

        self.mean_ep_rewards = []
        self.mean_ep_lengths = []
        self.mean_ep_losses = []
        self.mean_ep_times = []

        self.ep_times = []

        self.reset()
        
    def reset(self):
        self.curr_rewards = 0
        self.curr_ep_length = 0
        self.curr_loss = 0
        self.ep_time_start = time.time()

    def log_step(self, reward, loss):
        self.curr_rewards += reward
        self.curr_ep_length += 1

        if loss is not None:
            self.curr_loss += loss

    def log_episode(self):
        self.rewards.append(self.curr_rewards)
        self.ep_lengths.append(self.curr_ep_length)
        self.ep_losses.append(self.curr_loss)

        time_delta = time.time() - self.ep_time_start
        self.ep_times.append(time_delta)

        self.reset()
    
    def record(self, episode, step, epsilon, is_save=True):
        mean_ep_reward = np.round(np.mean(self.rewards[-100:]), 3)
        mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
        mean_ep_loss = np.round(np.mean(self.ep_losses[-100:]), 3)
        mean_ep_times = np.round(np.mean(self.ep_times[-100:]), 3)

        self.epsilons.append(epsilon)

        self.mean_ep_rewards.append(mean_ep_reward)
        self.mean_ep_lengths.append(mean_ep_length)
        self.mean_ep_losses.append(mean_ep_loss)

        print(
            f'Episode {episode} - '
            f'Step {step} - '
            f'Epsilon {epsilon} - '
            f'MeanEpReward {mean_ep_reward} - '
            f'MeanEpLength {mean_ep_length} - '
            f'MeanEpLoss {mean_ep_loss} - '
            f'MeanTime {mean_ep_times}'
        )

        if is_save:
            with open(f'{self.save_path}/metrics.txt', 'a') as f:
                f.write(
                    f"{episode:8d}{step:10d}{epsilon:20.8f}"
                    f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_times:15.3f}\n"
                )

                for metric in ["mean_ep_rewards", "mean_ep_lengths", 'mean_ep_times']:
                    plt.plot(getattr(self, metric))
                    plt.xlabel('Episodes')
                    plt.ylabel(metric)
                    plt.savefig(f"{self.save_path}/{metric}_plot.jpg")
                    plt.clf()

In [3]:
import gym
import cv2
import time, datetime
import numpy as np
from gym import ObservationWrapper
from gym.wrappers import FrameStack

class ImageToPyTorch(gym.ObservationWrapper):

    def __init__(self, env):
        super(ImageToPyTorch, self).__init__(env)
        old_shape = self.observation_space.shape
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]),
                                                dtype=np.float32)
    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)

class BufferWrapper(gym.ObservationWrapper):

    def __init__(self, env, n_steps, dtype=np.float32):
        super(BufferWrapper, self).__init__(env)
        self.dtype = dtype
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0),
                                                old_space.high.repeat(n_steps, axis=0), dtype=dtype)

    def reset(self):
        self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
        return self.observation(self.env.reset())

    def observation(self, observation):
        self.buffer[:-1] = self.buffer[1:]
        self.buffer[-1] = observation
        return self.buffer

class FrameSkip(gym.Wrapper):
    def __init__(self, env=None, frames=1, limit=False, render_game=False):
        super(FrameSkip, self).__init__(env)
        self.frames = frames
        self.tic = 0
        self.limit = limit
        self.render_game = render_game

    def step(self, action):
        net_reward = 0
        for _ in range(self.frames):
            # Accumulate reward and repeat the same action
            obs, reward, done, info = self.env.step(action)
            net_reward += reward
            self.toc = time.perf_counter()

            if self.render_game:
                self.render()

            if self.tic!=0 and self.limit != False:
                time.sleep(max(self.limit-(self.toc-self.tic),0))
            self.tic = time.perf_counter()

            if done:
                break

        return obs, net_reward, done, info

class Rescale(gym.ObservationWrapper):

    def __init__(self, env=None, shape=84):
        super(Rescale, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)

    def observation(self, obs):
        return Rescale.process(obs)

    @staticmethod
    def process(frame):

        if frame.size == 240 * 256 * 3:
            img = np.reshape(frame, [240, 256, 3]).astype(np.float32)
        else:
            assert False, "Unknown resolution."

        #greyscale
        r, g, b = img[:, :, 0], img[:, :, 1], img[:, :, 2]
        img = 0.299 * r + 0.587 * g + 0.114 * b

        resized_screen = cv2.resize(img[40:222, :], (84, 84), interpolation=cv2.INTER_AREA) # crop extra information on the screen

        resized_screen *= 1.0 / resized_screen.max()   # norm
        resized_screen = np.reshape(resized_screen, [84, 84, 1])

        return resized_screen
    
def apply_wrappers(env):
    env = FrameSkip(env, frames=4, limit=1 / 150, render_game=True)
    env = Rescale(env)
    env = ImageToPyTorch(env)
    env = BufferWrapper(env, 4)

    return env

In [4]:
class DuelDDQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DuelDDQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )

        conv_out_size = self._get_conv_out(input_shape)
        self.fc_value = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU())

        self.fc_advantage = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU())

        self.value = nn.Sequential(nn.Linear(512, 1))

        self.advantage = nn.Sequential(nn.Linear(512, n_actions))

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0], -1)

        value = self.fc_value(conv_out)
        advantage = self.fc_advantage(conv_out)

        value = self.value(value)
        advantage = self.advantage(advantage)

        avg_advantage = torch.mean(advantage, dim=1, keepdim=True)
        Q = value + advantage - avg_advantage

        return Q

In [5]:
class Agent:
    def __init__(self, state_space, action_space):

        # Define DQN Layers
        self.state_space = state_space
        self.action_space = action_space
        self.experience = deque(maxlen=30000)

        self.lr = 0.00025

        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print("Using {}".format(self.device))

        self.main_nn = DuelDDQN(state_space, action_space).to(self.device)
        self.target_nn = DuelDDQN(state_space, action_space).to(self.device)

        self.optimizer = torch.optim.Adam(self.main_nn.parameters(), lr=self.lr)
        self.network_copy = 5000
        self.step = 0

        self.batch_size = 32
        self.eps = 1
        self.eps_min = 0.01
        self.eps_decay = 0.9997
        self.gamma = 0.9

        # Learning parameters
        self.l1 = nn.SmoothL1Loss().to(self.device)

    def save_experience(self, state, action, reward, state2, done):
        self.experience.append((state.float(), action.float(), reward.float(), state2.float(), done.float()))

    def choose_action(self, state):
        self.step += 1
        if random.random() < self.eps:
            return torch.tensor([[random.randrange(self.action_space)]])

        return torch.argmax(self.main_nn(state.to(self.device))).unsqueeze(0).unsqueeze(0).cpu()

    def copy_model(self):
        self.target_nn.load_state_dict(self.main_nn.state_dict())

    def train(self):
        if self.step % self.network_copy == 0 and self.step > 0:
            self.copy_model()

        if len(self.experience) < self.batch_size:
            return

        minibatch = random.sample(self.experience, self.batch_size)
        state = torch.tensor(np.array([el[0].squeeze(dim=0) for el in minibatch])).to(self.device)
        action = torch.tensor(np.array([el[1].squeeze(dim=0) for el in minibatch])).to(self.device)
        reward = torch.tensor(np.array([el[2].squeeze(dim=0) for el in minibatch])).to(self.device)
        next_state = torch.tensor(np.array([el[3].squeeze(dim=0) for el in minibatch])).to(self.device)
        done = torch.tensor(np.array([el[4].squeeze(dim=0) for el in minibatch])).to(self.device)

        self.optimizer.zero_grad()
        target = reward + torch.mul((self.gamma * self.target_nn(next_state).max(1).values.unsqueeze(1)), 1 - done)
        current = self.main_nn(state).gather(1, action.long())

        # Updating network
        loss = self.l1(current, target)
        loss.backward()
        self.optimizer.step()

        self.eps = max(self.eps, self.eps_min)

        return loss.item()
    
    def save_model(self, episode, path):
        torch.save(self.main_nn.state_dict(), f'{path}/episode_{episode}.pth')

    def load_model(self, path):
        self.main_nn.load_state_dict(torch.load(path))

In [7]:
save_dir = Path('logs')
try:
    save_dir.mkdir(parents=True)
except:
    pass

In [None]:
env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')
env = apply_wrappers(env)
env = JoypadSpace(env, RIGHT_ONLY)

agent = Agent(state_space=env.observation_space.shape, action_space=env.action_space.n)

num_episodes = 20000 + 1

logger = Logger(save_dir)
for ep_num in range(num_episodes):
    
    state = env.reset()
    state = torch.Tensor(np.array([state]))
    total_reward = 0
    episode_reward = []
    completed_level = False
    done = False

    while not done:
        action = agent.choose_action(state)
        state_next, reward, done, info = env.step(int(action[0]))

        completed_level = info['flag_get']
        total_reward += reward
        episode_reward.append(reward) 

        # Stuck
        if np.sum(episode_reward[-50:]) < 0:
            break

        state_next = torch.Tensor(np.array([state_next]))
        reward_tensor = torch.tensor([reward]).unsqueeze(0)

        terminal = torch.tensor(np.array([int(done)])).unsqueeze(0)

        agent.save_experience(state, action, reward_tensor, state_next, terminal)
        loss = agent.train()

        logger.log_step(reward, loss)

        state = state_next

        if terminal:
            break

    agent.eps *= agent.eps_decay
    # total_rewards.append(total_reward)
    logger.log_episode()

    logger.record(episode=ep_num,
                    epsilon=agent.eps,
                    step = agent.step,
                    is_save=True)

    if ep_num % 500 == 0 and ep_num > 0:
        agent.save_model(ep_num, save_dir)

env.close()



In [None]:
# Replay

env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')
env = apply_wrappers(env)
env = JoypadSpace(env, RIGHT_ONLY)

agent = Agent(state_space=env.observation_space.shape, action_space=env.action_space.n)
agent.load_model(f'{save_dir}/episode_10428.pth')
agent.eps = 0

completed_level = False
while not completed_level:
    state = env.reset()
    state = torch.Tensor(np.array([state]))
    done = False

    while not done:
        env.render()
        action = agent.choose_action(state)
        state_next, reward, done, info = env.step(int(action[0]))
        
        state_next = torch.Tensor(np.array([state_next]))
        completed_level = info['flag_get']

        state = state_next

env.close()