In [None]:
!pip install -U gym-super-mario-bros

Collecting gym-super-mario-bros
  Downloading gym_super_mario_bros-7.4.0-py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.1/199.1 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting nes-py>=8.1.4 (from gym-super-mario-bros)
  Downloading nes_py-8.2.1.tar.gz (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.7/77.7 kB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pyglet<=1.5.21,>=1.4.0 (from nes-py>=8.1.4->gym-super-mario-bros)
  Downloading pyglet-1.5.21-py3-none-any.whl (1.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m30.9 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: nes-py
  Building wheel for nes-py (setup.py) ... [?25l[?25hdone
  Created wheel for nes-py: filename=nes_py-8.2.1-cp310-cp310-linux_x86_64.whl size=535677 sha256=e7a700c701b88ff7ae8a4492c444e4158a66d6786d09

In [None]:
!pip install --upgrade gym==0.25.2



In [None]:
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
import numpy as np
import matplotlib.pyplot as plt
from gym_super_mario_bros.actions import RIGHT_ONLY
import torch
import torch.nn as nn
import torch.nn.functional as F
from gym.wrappers.record_video import RecordVideo
import os
from datetime import datetime

In [None]:
DISCOUNT_FACTOR = 0.9
LR = 0.1

In [None]:
def epsilon_greedy(model, state, epsilon=0.1):
  prob = np.random.random()
  if prob < 1 - epsilon:
    values = model(state)
    return torch.argmax(values)
  else:
    return model.env.action_space.sample()

In [None]:
def gather_samples(env, n_episodes=20000):
  samples = []
  for i in range(n_episodes):
    state = env.reset()
    state = state.reshape(1, 3, 240, 256)
    done = False
    while not done:
      action = env.action_space.sample()
      samples.append(state)
      state, reward, done, info = env.step(action)
  return samples

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [None]:
class NESModel(nn.Module):
  def __init__(self, input_dim, action_size):
    super(NESModel, self).__init__()
    self.input_dim = input_dim
    self.n_action = action_size
    self.conv1 = nn.Conv2d(self.input_dim, 32, 3, padding=1)
    self.bn1 = nn.BatchNorm2d(32, affine=False, track_running_stats=False)
    self.pool1 = nn.MaxPool2d(2)
    self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
    self.bn2 = nn.BatchNorm2d(64, affine=False, track_running_stats=False)
    self.pool2 = nn.MaxPool2d(2)
    self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
    self.bn3 = nn.BatchNorm2d(128, affine=False, track_running_stats=False)
    self.pool3 = nn.MaxPool2d(2)
    self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
    self.bn4 = nn.BatchNorm2d(256, affine=False, track_running_stats=False)
    self.pool4 = nn.MaxPool2d(2)
    self.conv5 = nn.Conv2d(256, 512, 3, padding=1)
    self.bn5 = nn.BatchNorm2d(512, affine=False, track_running_stats=False)
    self.pool5 = nn.MaxPool2d(2)
    self.conv6 = nn.Conv2d(512, 1024, 3, padding=1)
    self.bn6 = nn.BatchNorm2d(1024, affine=False, track_running_stats=False)
    self.pool6 = nn.MaxPool2d(2)
    self.conv7 = nn.Conv2d(1024, 2048, 3, padding=1)
    self.bn7 = nn.BatchNorm2d(2048, affine=False, track_running_stats=False)

    self.fc1 = nn.Linear(24576, 128)
    self.dropout = nn.Dropout(0.5)
    self.fc2 = nn.Linear(128, self.n_action)

  def forward(self, X):
    out = self.conv1(X)
    out = F.relu(self.bn1(out))
    out = self.pool1(out)
    out = self.conv2(out)
    out = F.relu(self.bn2(out))
    out = self.pool2(out)
    out = self.conv3(out)
    out = F.relu(self.bn3(out))
    out = self.pool3(out)
    out = self.conv4(out)
    out = F.relu(self.bn4(out))
    out = self.pool4(out)
    out = self.conv5(out)
    out = F.relu(self.bn5(out))
    out = self.pool5(out)
    out = self.conv6(out)
    out = F.relu(self.bn6(out))
    out = self.pool6(out)
    out = self.conv7(out)
    out = F.relu(self.bn7(out))
    out = out.flatten()
    out = self.fc1(out)
    out = self.dropout(out)
    out = F.relu(out)
    out = self.fc2(out)
    return out

In [None]:
class DQNAgent:
    def __init__(self, state_size, action_size):
      self.model = NESModel(state_size, action_size)
      self.state_size = state_size
      self.action_size = action_size
      self.discount_rate = 0.95
      self.epsilon = 1.0
      self.epsilon_min = 0.01
      self.epsilon_decay = 0.995
      self.criterion = nn.BCEWithLogitsLoss()
      self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.01)
      self.losses = []

    def act_(self, state):
      if np.random.random() < self.epsilon:
        return np.random.choice(self.action_size)
      act_values = self.model(state)
      return torch.argmax(act_values).item()

    def train(self, state, action, reward, next_state, done):
      self.model.eval()
      if done:
        target = reward
      else:
        target = reward + self.discount_rate * torch.max(self.model(next_state)).item()

      target_full = self.model(state)
      target_full[action] = target

      # train
      self.model.train()
      self.optimizer.zero_grad()
      outputs = self.model(state)
      loss = self.criterion(outputs, target_full)

      # Backward and Optimize
      loss.backward()
      self.optimizer.step()

      self.losses.append(loss.item())

      if self.epsilon > self.epsilon_min:
        self.epsilon *= self.epsilon_decay

    def load(self, name):
      torch.load(name, map_location=device)

    def save(self, name):
      torch.save(self.model, name)

In [None]:
def test_agent(model, env, n_episodes=1):
  reward_per_episode = np.zeros(n_episodes)
  for it in range(n_episodes):
    done = False
    episode_reward = 0
    state = env.reset()
    state = torch.from_numpy(state.astype(np.float32)).unsqueeze(0).permute(0, 3, 1, 2)
    while not done:
      action = epsilon_greedy(model, state)
      state, reward, done, _ = env.step(action)
      state = torch.from_numpy(state.astype(np.float32)).unsqueeze(0).permute(0, 3, 1, 2)
      episode_reward += reward
    reward_per_episode[it] = episode_reward
  return np.mean(reward_per_episode)

In [None]:
def watch_agent(model, env, epsilon):
  done = False
  episode_reward = 0
  state = env.reset()
  state = torch.from_numpy(state.astype(np.float32)).unsqueeze(0).permute(0, 3, 1, 2)
  while not done:
    action = epsilon_greedy(model, state, epsilon)
    state, reward, done, _ = env.step(action)
    state = torch.from_numpy(state.astype(np.float32)).unsqueeze(0).permute(0, 3, 1, 2)
    episode_reward += reward
  print(f"Episode Reward: {episode_reward}")

In [None]:
def play_one_episode(agent, env, is_train):
  state = env.reset()
  state = torch.from_numpy(state.astype(np.float32)).unsqueeze(0).permute(0, 3, 1, 2)
  done = False
  while not done:
    action = agent.act_(state)
    next_state, reward, done, info = env.step(action)
    next_state = torch.from_numpy(next_state.astype(np.float32)).unsqueeze(0).permute(0, 3, 1, 2)
    if is_train == 'train':
      agent.train(state, action, reward, next_state, done)

    state = next_state

  return info['score']

In [None]:
# config
MODELS_FOLDER = '/content/super_mario_bros_models'
REWARDS_FOLDER = '/content/super_mario_bros_rewards'
MODE = "train"
NUM_EPISODES = 200 if MODE == "train" else 2

In [None]:
if __name__ == "__main__":

  env = gym_super_mario_bros.make('SuperMarioBros-v3')
  env = JoypadSpace(env, RIGHT_ONLY)

  if not os.path.exists(MODELS_FOLDER):
    os.makedirs(MODELS_FOLDER)
  if not os.path.exists(REWARDS_FOLDER):
    os.makedirs(REWARDS_FOLDER)

  state_size = 3
  action_size = len(RIGHT_ONLY)
  agent = DQNAgent(state_size, action_size)
  portfolio_value = []

  # to really test the algorithm choose stocks that go up and down
  if MODE == "test":

    env = gym_super_mario_bros.make('SuperMarioBros-v3')
    env = JoypadSpace(env, RIGHT_ONLY)

    # make sure epsilon is not 1!
    # no need to run multiple episodes if epsilon = 0, it's deterministic
    agent.epsilon = 0.01
    agent.load(f'{MODELS_FOLDER}/smb_weight.pt')

    # watch trained agent
    env = RecordVideo(env, './video',  episode_trigger = lambda episode_number: True)
    watch_agent(agent.model, env, epsilon=agent.epsilon)

  # play the game num_episodes times
  for episode in range(NUM_EPISODES):
    t0 = datetime.now()
    value = play_one_episode(agent, env, MODE)
    dt = datetime.now() - t0
    print(f"episode: {episode + 1}/{NUM_EPISODES}, episode end value: {value:.2f}, duration: {dt}")
    portfolio_value.append(value)

  # save the weights when we are done
  if MODE == 'train':
    # save the DQN
    agent.save(f'{MODELS_FOLDER}/smb_weight.pt')

    # plot losses
    plt.plot(agent.losses)
    plt.show()


  # save portfolio value for each episode
  np.save(f'{REWARDS_FOLDER}/{MODE}.npy', portfolio_value)

  deprecation(
  deprecation(
  logger.deprecation(
