In [None]:
!pip install gym-super-mario-bros
!pip install nes-py
!pip install gym[other]
!pip install torchrl

Collecting gym-super-mario-bros
  Downloading gym_super_mario_bros-7.4.0-py3-none-any.whl.metadata (10 kB)
Collecting nes-py>=8.1.4 (from gym-super-mario-bros)
  Downloading nes_py-8.2.1.tar.gz (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.7/77.7 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pyglet<=1.5.21,>=1.4.0 (from nes-py>=8.1.4->gym-super-mario-bros)
  Downloading pyglet-1.5.21-py3-none-any.whl.metadata (7.6 kB)
Downloading gym_super_mario_bros-7.4.0-py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.1/199.1 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyglet-1.5.21-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m26.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: nes-py
  Building wheel for nes-py (setup.py) ... [?25l[?25hdone
  Crea

In [None]:
import torch
import numpy as np
#from agent_nn import AgentNN

from tensordict import TensorDict
from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage

class Agent:
    def __init__(self,
                 input_dims,
                 num_actions,
                 lr=0.00025,
                 gamma=0.9,
                 epsilon=1.0,
                 eps_decay=0.99999975,
                 eps_min=0.1,
                 replay_buffer_capacity=100_000,
                 batch_size=32,
                 sync_network_rate=10000):

        self.num_actions = num_actions
        self.learn_step_counter = 0

        # Hyperparameters
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_decay = eps_decay
        self.eps_min = eps_min
        self.batch_size = batch_size
        self.sync_network_rate = sync_network_rate

        # Networks
        self.online_network = AgentNN(input_dims, num_actions)
        self.target_network = AgentNN(input_dims, num_actions, freeze=True)

        # Optimizer and loss
        self.optimizer = torch.optim.Adam(self.online_network.parameters(), lr=self.lr)
        self.loss = torch.nn.MSELoss()
        # self.loss = torch.nn.SmoothL1Loss() # Feel free to try this loss function instead!

        # Replay buffer
        storage = LazyMemmapStorage(replay_buffer_capacity)
        self.replay_buffer = TensorDictReplayBuffer(storage=storage)

    def choose_action(self, observation):
        if np.random.random() < self.epsilon:
            return np.random.randint(self.num_actions)
        # Passing in a list of numpy arrays is slower than creating a tensor from a numpy array
        # Hence the `np.array(observation)` instead of `observation`
        # observation is a LIST of numpy arrays because of the LazyFrame wrapper
        # Unqueeze adds a dimension to the tensor, which represents the batch dimension
        observation = torch.tensor(np.array(observation), dtype=torch.float32) \
                        .unsqueeze(0) \
                        .to(self.online_network.device)
        # Grabbing the index of the action that's associated with the highest Q-value
        return self.online_network(observation).argmax().item()

    def decay_epsilon(self):
        self.epsilon = max(self.epsilon * self.eps_decay, self.eps_min)

    def store_in_memory(self, state, action, reward, next_state, done):
        self.replay_buffer.add(TensorDict({
                                            "state": torch.tensor(np.array(state), dtype=torch.float32),
                                            "action": torch.tensor(action),
                                            "reward": torch.tensor(reward),
                                            "next_state": torch.tensor(np.array(next_state), dtype=torch.float32),
                                            "done": torch.tensor(done)
                                          }, batch_size=[]))

    def sync_networks(self):
        if self.learn_step_counter % self.sync_network_rate == 0 and self.learn_step_counter > 0:
            self.target_network.load_state_dict(self.online_network.state_dict())

    def save_model(self, path):
        torch.save(self.online_network.state_dict(), path)

    def load_model(self, path):
        self.online_network.load_state_dict(torch.load(path))
        self.target_network.load_state_dict(torch.load(path))

    def learn(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        self.sync_networks()

        self.optimizer.zero_grad()

        samples = self.replay_buffer.sample(self.batch_size).to(self.online_network.device)

        keys = ("state", "action", "reward", "next_state", "done")

        states, actions, rewards, next_states, dones = [samples[key] for key in keys]

        predicted_q_values = self.online_network(states) # Shape is (batch_size, n_actions)
        predicted_q_values = predicted_q_values[np.arange(self.batch_size), actions.squeeze()]

        # Max returns two tensors, the first one is the maximum value, the second one is the index of the maximum value
        target_q_values = self.target_network(next_states).max(dim=1)[0]
        # The rewards of any future states don't matter if the current state is a terminal state
        # If done is true, then 1 - done is 0, so the part after the plus sign (representing the future rewards) is 0
        target_q_values = rewards + self.gamma * target_q_values * (1 - dones.float())

        loss = self.loss(predicted_q_values, target_q_values)
        loss.backward()
        self.optimizer.step()

        self.learn_step_counter += 1
        self.decay_epsilon()




In [None]:
import torch
from torch import nn
import numpy as np

class AgentNN(nn.Module):
    def __init__(self, input_shape, n_actions, freeze=False):
        super().__init__()
        # Conolutional layers
        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
        )

        conv_out_size = self._get_conv_out(input_shape)

        # Linear layers
        self.network = nn.Sequential(
            self.conv_layers,
            nn.Flatten(),
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

        if freeze:
            self._freeze()

        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.to(self.device)

    def forward(self, x):
        return self.network(x)

    def _get_conv_out(self, shape):
        o = self.conv_layers(torch.zeros(1, *shape))
        # np.prod returns the product of array elements over a given axis
        return int(np.prod(o.size()))

    def _freeze(self):
        for p in self.network.parameters():
            p.requires_grad = False


In [None]:
import numpy as np
from gym import Wrapper
from gym.wrappers import GrayScaleObservation, ResizeObservation, FrameStack


class SkipFrame(Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self.skip = skip

    def step(self, action):
        total_reward = 0.0
        done = False
        for _ in range(self.skip):
            next_state, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return next_state, total_reward, done, info


def apply_wrappers(env):
    env = SkipFrame(env, skip=4) # Num of frames to apply one action to
    env = ResizeObservation(env, shape=84) # Resize frame from 240x256 to 84x84
    env = GrayScaleObservation(env)
    env = FrameStack(env, num_stack=4, lz4_compress=True) # May need to change lz4_compress to False if issues arise
    return env

In [None]:
import gym_super_mario_bros
from gym_super_mario_bros.actions import RIGHT_ONLY

from nes_py.wrappers import JoypadSpace
import gym # Import the gym module

ENV_NAME = 'SuperMarioBros-1-1-v0'

# Define video_every before using it in the lambda function
video_every = 1  # Record a video every 1 episodes, for example

# define a trigger function, return True to start recording a new video:
episode_trigger=lambda episode_id: (episode_id%video_every==0)

env = gym_super_mario_bros.make(ENV_NAME)
env = JoypadSpace(env, RIGHT_ONLY)
#env = gym.wrappers.RecordVideo(env, "./video", episode_trigger=episode_trigger, video_length=100000)

done = False
env.reset()
counter = 0

while not done:
    # Only go right
    # action = RIGHT_ONLY.index(['right'])

    # Choose random action
    action = env.action_space.sample()

    _, _, done, _ = env.step(action)
    env.render()

  logger.warn(
  deprecation(
  deprecation(
If you want to render in human mode, initialize the environment in this way: gym.make('EnvName', render_mode='human') and don't call the render method.
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(
  logger.warn(


ImportError: Library "GLU" not found.

In [None]:
from IPython.display import HTML
from base64 import b64encode

video_path = "/content/video/rl-video-episode-0.mp4"  # Anpassen
with open(video_path, "rb") as video_file:
    video_data = b64encode(video_file.read()).decode("ascii")

HTML(f"""
<video width=400 controls>
    <source src="data:video/mp4;base64,{video_data}" type="video/mp4">
</video>
""")


In [None]:
import gym_super_mario_bros
from gym_super_mario_bros.actions import RIGHT_ONLY
from nes_py.wrappers import JoypadSpace

#from wrappers import apply_wrappers
#from agent import Agent

ENV_NAME = 'SuperMarioBros-1-1-v0'
SHOULD_TRAIN = True
DISPLAY = True
NUM_OF_EPISODES = 5


env = gym_super_mario_bros.make(ENV_NAME)
env = JoypadSpace(env, RIGHT_ONLY)

env = apply_wrappers(env)

agent = Agent(input_dims=env.observation_space.shape, num_actions=env.action_space.n)

for i in range(NUM_OF_EPISODES):
    done = False
    state = env.reset()
    while not done:
        a = agent.choose_action(state)
        new_state, reward, done, info  = env.step(a)

        agent.store_in_memory(state, a, reward, new_state, done)
        agent.learn()

        state = new_state

env.close()

  logger.warn(
  deprecation(
  deprecation(
  logger.deprecation(
  if not isinstance(done, (bool, np.bool8)):


KeyboardInterrupt: 