## Installieren aller Pakete

In [None]:
import warnings
warnings.filterwarnings('ignore')


import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [None]:
!pip3 install -r ../requirements.txt

In [None]:
import torch
from torch import nn
from torchvision import transforms as T
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os, copy

# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace

# Super Mario environment for OpenAI Gym
import gym_super_mario_bros

import numpy as np

from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
from time import sleep
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT

import random, datetime
import time
from pathlib import Path

import gym
import gym_super_mario_bros
from gym.wrappers import FrameStack, GrayScaleObservation, TransformObservation
from nes_py.wrappers import JoypadSpace

from utils.agent import Mario
from utils.wrappers import ResizeObservation, SkipFrame


import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

import random, datetime
from pathlib import Path

import gym
import gym_super_mario_bros
from gym.wrappers import FrameStack, GrayScaleObservation, TransformObservation
from nes_py.wrappers import JoypadSpace

In [None]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        done = False
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info


class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        # permute [H, W, C] array to [C, H, W] tensor
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape), T.Normalize(0, 255)]
        )
        observation = transforms(observation).squeeze(0)
        return observation

In [None]:
class Mario:
    def __init__(self, state_dim, action_dim, save_dir):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.save_dir = save_dir

        self.use_cuda = torch.cuda.is_available()

        # Mario's DNN to predict the most optimal action - we implement this in the Learn section
        self.net = MarioNet(self.state_dim, self.action_dim).float()
        if self.use_cuda:
            self.net = self.net.to(device="cuda")

        self.exploration_rate = 1
        self.exploration_rate_decay = 0.99999975
        self.exploration_rate_min = 0.1
        self.curr_step = 0

        self.save_every = 5e5  # no. of experiences between saving Mario Net

    def act(self, state):
        """
    Given a state, choose an epsilon-greedy action and update value of step.

    Inputs:
    state(LazyFrame): A single observation of the current state, dimension is (state_dim)
    Outputs:
    action_idx (int): An integer representing which action Mario will perform
    """
        # EXPLORE
        if np.random.rand() < self.exploration_rate:
            action_idx = np.random.randint(self.action_dim)

        # EXPLOIT
        else:
            state = state.__array__()
            if self.use_cuda:
                state = torch.tensor(state).cuda()
            else:
                state = torch.tensor(state)
            state = state.unsqueeze(0)
            action_values = self.net(state, model="online")
            action_idx = torch.argmax(action_values, axis=1).item()

        # decrease exploration_rate
        self.exploration_rate *= self.exploration_rate_decay
        self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)

        # increment step
        self.curr_step += 1
        return action_idx

In [None]:
class Mario(Mario):  # subclassing for continuity
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.memory = deque(maxlen=100000)
        self.batch_size = 32

    def cache(self, state, next_state, action, reward, done):
        """
        Store the experience to self.memory (replay buffer)

        Inputs:
        state (LazyFrame),
        next_state (LazyFrame),
        action (int),
        reward (float),
        done(bool))
        """
        state = state.__array__()
        next_state = next_state.__array__()

        if self.use_cuda:
            state = torch.tensor(state).cuda()
            next_state = torch.tensor(next_state).cuda()
            action = torch.tensor([action]).cuda()
            reward = torch.tensor([reward]).cuda()
            done = torch.tensor([done]).cuda()
        else:
            state = torch.tensor(state)
            next_state = torch.tensor(next_state)
            action = torch.tensor([action])
            reward = torch.tensor([reward])
            done = torch.tensor([done])

        self.memory.append((state, next_state, action, reward, done,))

    def recall(self):
        """
        Retrieve a batch of experiences from memory
        """
        batch = random.sample(self.memory, self.batch_size)
        state, next_state, action, reward, done = map(torch.stack, zip(*batch))
        return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()

In [None]:
class MarioNet(nn.Module):
    """mini cnn structure
  input -> (conv2d + relu) x 3 -> flatten -> (dense + relu) x 2 -> output
  """

    def __init__(self, input_dim, output_dim):
        super().__init__()
        c, h, w = input_dim

        if h != 84:
            raise ValueError(f"Expecting input height: 84, got: {h}")
        if w != 84:
            raise ValueError(f"Expecting input width: 84, got: {w}")

        self.online = nn.Sequential(
            nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim),
        )

        self.target = copy.deepcopy(self.online)

        # Q_target parameters are frozen.
        for p in self.target.parameters():
            p.requires_grad = False

    def forward(self, input, model):
        if model == "online":
            return self.online(input)
        elif model == "target":
            return self.target(input)

In [None]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.gamma = 0.9

    def td_estimate(self, state, action):
        current_Q = self.net(state, model="online")[
            np.arange(0, self.batch_size), action
        ]  # Q_online(s,a)
        return current_Q

    @torch.no_grad()
    def td_target(self, reward, next_state, done):
        next_state_Q = self.net(next_state, model="online")
        best_action = torch.argmax(next_state_Q, axis=1)
        next_Q = self.net(next_state, model="target")[
            np.arange(0, self.batch_size), best_action
        ]
        return (reward + (1 - done.float()) * self.gamma * next_Q).float()

In [None]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
        self.loss_fn = torch.nn.SmoothL1Loss()

    def update_Q_online(self, td_estimate, td_target):
        loss = self.loss_fn(td_estimate, td_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def sync_Q_target(self):
        self.net.target.load_state_dict(self.net.online.state_dict())

In [None]:
class Mario(Mario):
    def save(self):
        save_path = (
            self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.chkpt"
        )
        torch.save(
            dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
            save_path,
        )
        print(f"MarioNet saved to {save_path} at step {self.curr_step}")
    def load(self, load_dir):
        save_path = (
            self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.chkpt"
        )
        torch.save(
            dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
            save_path,
        )
        print(f"MarioNet saved to {save_path} at step {self.curr_step}")

In [None]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.burnin = 1e4  # min. experiences before training
        self.learn_every = 3  # no. of experiences between updates to Q_online
        self.sync_every = 1e4  # no. of experiences between Q_target & Q_online sync

    def learn(self):
        if self.curr_step % self.sync_every == 0:
            self.sync_Q_target()

        if self.curr_step % self.save_every == 0:
            self.save()

        if self.curr_step < self.burnin:
            return None, None

        if self.curr_step % self.learn_every != 0:
            return None, None

        # Sample from memory
        state, next_state, action, reward, done = self.recall()

        # Get TD Estimate
        td_est = self.td_estimate(state, action)

        # Get TD Target
        td_tgt = self.td_target(reward, next_state, done)

        # Backpropagate loss through Q_online
        loss = self.update_Q_online(td_est, td_tgt)

        return (td_est.mean().item(), loss)

In [None]:
import numpy as np
import time, datetime
import matplotlib.pyplot as plt


class MetricLogger:
    def __init__(self, save_dir):
        self.save_log = save_dir / "log"
        with open(self.save_log, "w") as f:
            f.write(
                f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
                f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}"
                f"{'TimeDelta':>15}{'Time':>20}\n"
            )
        self.ep_rewards_plot = save_dir / "reward_plot.jpg"
        self.ep_lengths_plot = save_dir / "length_plot.jpg"
        self.ep_avg_losses_plot = save_dir / "loss_plot.jpg"
        self.ep_avg_qs_plot = save_dir / "q_plot.jpg"

        # History metrics
        self.ep_rewards = []
        self.ep_lengths = []
        self.ep_avg_losses = []
        self.ep_avg_qs = []

        # Moving averages, added for every call to record()
        self.moving_avg_ep_rewards = []
        self.moving_avg_ep_lengths = []
        self.moving_avg_ep_avg_losses = []
        self.moving_avg_ep_avg_qs = []

        # Current episode metric
        self.init_episode()

        # Timing
        self.record_time = time.time()

    def log_step(self, reward, loss, q):
        self.curr_ep_reward += reward
        self.curr_ep_length += 1
        if loss:
            self.curr_ep_loss += loss
            self.curr_ep_q += q
            self.curr_ep_loss_length += 1

    def log_episode(self):
        "Mark end of episode"
        self.ep_rewards.append(self.curr_ep_reward)
        self.ep_lengths.append(self.curr_ep_length)
        if self.curr_ep_loss_length == 0:
            ep_avg_loss = 0
            ep_avg_q = 0
        else:
            ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
            ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
        self.ep_avg_losses.append(ep_avg_loss)
        self.ep_avg_qs.append(ep_avg_q)

        self.init_episode()

    def init_episode(self):
        self.curr_ep_reward = 0.0
        self.curr_ep_length = 0
        self.curr_ep_loss = 0.0
        self.curr_ep_q = 0.0
        self.curr_ep_loss_length = 0

    def record(self, episode, epsilon, step):
        mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
        mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
        mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
        mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)
        self.moving_avg_ep_rewards.append(mean_ep_reward)
        self.moving_avg_ep_lengths.append(mean_ep_length)
        self.moving_avg_ep_avg_losses.append(mean_ep_loss)
        self.moving_avg_ep_avg_qs.append(mean_ep_q)

        last_record_time = self.record_time
        self.record_time = time.time()
        time_since_last_record = np.round(self.record_time - last_record_time, 3)

        print(
            f"Episode {episode} - "
            f"Step {step} - "
            f"Epsilon {epsilon} - "
            f"Mean Reward {mean_ep_reward} - "
            f"Mean Length {mean_ep_length} - "
            f"Mean Loss {mean_ep_loss} - "
            f"Mean Q Value {mean_ep_q} - "
            f"Time Delta {time_since_last_record} - "
            f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
        )

        with open(self.save_log, "a") as f:
            f.write(
                f"{episode:8d}{step:8d}{epsilon:10.3f}"
                f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_q:15.3f}"
                f"{time_since_last_record:15.3f}"
                f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
            )

        for metric in ["ep_rewards", "ep_lengths", "ep_avg_losses", "ep_avg_qs"]:
            plt.plot(getattr(self, f"moving_avg_{metric}"))
            plt.savefig(getattr(self, f"{metric}_plot"))
            plt.clf()

# Hot Topic: Reinforcement Learning


<img width=70% src="../images/pawel-czerwinski-oNcFi_dYz_0-unsplash.jpg">

In diesem Exkurs werden wir eine kurze Einführung in Reinforcement Learning geben und anhand eines praktischen Beispiels die Funktionsweise kurz aufzeigen. Wir verwenden hierbei **OpenAI Gym**, einer Entwicklungsplatform für Reinforcement Learning. 

## Wiederholung Reinforcement Learning

<img width=70% src="../images/reinforcement.jpg">

Reinforcement Learning ist eines der vier Paradigmen im Machine Learning neben Supervised, Unsupervised und Semi-Supervised Learning. Im Reinforcement Learning agiert ein Agent mit Aktionen mit der Umwelt. Die Umwelt liefert wiederum Belohnungen und den aktuellen Stand der Umwelt als Beobachtung. Als Lernprozess versucht man die sogenannte Policy/Strategie des Agenten anzupassen und zu verbessern.

Grundsätzlich unterscheidet man zwischen **Exploration** und **Exploitation** im Training. Einerseits will man bisherige Strategien verbessern, aber auch zu einem gewissen Teil neue Strategien austesten. Das wird meist mit einem Parameter spezifiziert. Zu Beginn ist die Exploration deutlich ausgeprägter als bei fortgeschrittenem Training.  

In dem Einsteiger Kurs "Künstliche Intelligenz und maschinelles Lernen für Einsteiger" sind wir in einer Einheit etwas genauer auf Reinforcement Learning eingegangen. [Link hier.](https://open.hpi.de/courses/kieinstieg2020/items/3OBaGkP33fILA2t8FwqiC1) 

## OpenAI Gym Environment


<img width=50% src="../images/openai-cover.png">

In unserer Einheit nutzen wir OpenAI Gym. OpenAI Gym ist ein Open Source Umgebung für die Entwicklung von Reinforcement Learning Modellen für das es eine Vielzahl sogenannter Environments gibt. Darunter sind Atari-Spiele, Nintendo-Spiele sowie Umgebungen mit Robotern, die es in der Realität zu kaufen gibt. 

Hier mal ein Ausschnitt mehrerer Umgebungen. 

<img width=70% src="../images/environments.jpg">

# SuperMario Bros. Umgebung

Im ersten Schritt wollen wir uns einmal die Umgebung ansehen und auch graphisch ausgeben lassen. Wir haben uns hier für das bekannte Nintendo-Spiel SuperMario Bros. entschieden als Environment. Die Environment wird als [OpenSource Projekt](https://github.com/Kautenja/gym-super-mario-bros) von anderen EntwicklerInnen zur Verfügung gestellt.

Im ersten Schritt definieren wir die Umgebung und anschließend führen für eine gewisse Anzahl von Schritten jeweils zufällige Aktionen aus. Hierbei findet noch kein Lernvorgang statt - wir wollen nur erst einmal die Umgebung zeigen. 

Um die Ausführung zu unterbrechen, kann man im Menü oben die aktuelle Zellenausführung stoppen.

In [None]:
env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')
env = JoypadSpace(env, SIMPLE_MOVEMENT)

done = True
for step in range(1000):
    if done:
        state = env.reset()
    state, reward, done, info = env.step(env.action_space.sample())
    env.render()
    time.sleep(.05)

env.close()

Für unser richtiges Reinforcement Learning Modell müssen wir allerdings noch weitere Schritte ausführen. 

Zunächst sehen wir uns einmal an, welche möglichen Actions wir zur Verfügung haben in dieser Environment. Wir werden das anschließend nur auf zwei Actions limitieren, um schneller zu trainieren. Wir verwenden dann nur noch **right** (walk right) und **right, A** (jump right). 

In [None]:
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0")
gym_super_mario_bros.actions.COMPLEX_MOVEMENT

In [None]:
env = JoypadSpace(env, [["right"], ["right", "A"]])


Wollen wir erfahren, was wir von unserer Environment als Informationen bekommen, so geben wir uns einmal die Rückgabewerte der Umgebung aus. Hier sieht man jedoch, dass wir nur relativ wenige Informationen direkt bekommen. Um unser Modell richtig trainieren zu können, müssen wir den aktuellen Stand eines Spiels jeweils als Bild erhalten.

Unser Modell lernt anhand der Bilder des aktuellen Spielstandes. Dabei werden sogenannte Convolutional Neural Networks genutzt, die wir in Woche 4 etwas genauer betrachten werden. 

In [None]:
env.reset()
next_state, reward, done, info = env.step(action=0)
print("Next-State: " + str(next_state.shape))
print("Reward: " + str(reward))
print("Done: " + str(done))
print("Info: " + str(info))

Damit wir Feedback der Environment auch in Form von "Bildern" bekommen, müssen wir folgendes tun. 

In [None]:
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
env = TransformObservation(env, f=lambda x: x / 255.)
env = FrameStack(env, num_stack=4)

# Q-Learning

Im Folgenden werden wir den Q-Learning Ansatz verwenden. Beim Q-Learning kann der Agent die Belohnungen der Umgebung nutzen, um im Laufe der Zeit zu lernen, welche Aktion in einem bestimmten Zustand der Environment am besten ist. Weiter werden wir hier nicht ins Detail gehen. 

Die Implementierung und finales Modell stammt von [YuansongFeng](https://github.com/YuansongFeng/MadMario) und einem [Pytorch-Tutorial](https://pytorch.org/tutorials/intermediate/mario_rl_tutorial.html).

# Die ersten Schritte beim Lernen

Wir wollen uns zunächst einmal ansehen, wie unser Modell sich ohne Training schlägt. Anschließend werden wir mehrere bereits trainierte Modell laden und uns verschieden gute Modell ansehen. 

In der Präsentation werden wir das nicht zeigen, doch wir haben in den Notebooks auch den Code zum Training der einzelnen Modelle. Da das sehr zeitintensiv ist, werden wir nur bereits trainierte Modelle verwenden. Das Training geschieht in sogenannten Episoden. Das Training von 1.000 Episoden kann bereits mehrere Stunden dauern. Für ein "wirklich gutes" Modell benötigt man mehrere zehntausend Episoden Training. 

In [None]:
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir="")

episodes = 100
for e in range(episodes):
    state = env.reset()
    while True:
        env.render()
        time.sleep(.05)

        action = mario.act(state)

        # Agent führt Aktion aus und bekommt dafür Feedback der Environment
        next_state, reward, done, info = env.step(action)

        # Mario merkt sich den aktuellen Stand
        mario.cache(state, next_state, action, reward, done)
        
        # Mario führt einen Lernschritt aus
        q, loss = mario.learn()

        state = next_state

        if done or info["flag_get"]:
            break

# Training 100 Episodes

In [None]:

save_dir = Path("checkpoints_100") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)

mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)

logger = MetricLogger(save_dir)



episodes = 100
for e in range(episodes):
    if e % 10 == 0:
        print("Episode: " + str(e))

    state = env.reset()

    # Play the game!
    while True:

        # Run agent on the state
        action = mario.act(state)

        # Agent performs action
        next_state, reward, done, info = env.step(action)

        # Remember
        mario.cache(state, next_state, action, reward, done)

        # Learn
        q, loss = mario.learn()

        # Logging
        logger.log_step(reward, loss, q)

        # Update state
        state = next_state

        # Check if end of game
        if done or info["flag_get"]:
            break

    logger.log_episode()

    if e % 10 == 0:
        logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)
mario.save()

# Training 1000 Episodes

In [None]:
# Start Training
save_dir = Path("checkpoints_1000") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)

mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)

logger = MetricLogger(save_dir)

episodes = 1000
for e in range(episodes):
    if e % 100 == 0:
        print("Episode: " + str(e))

    state = env.reset()

    # Play the game!
    while True:

        # Run agent on the state
        action = mario.act(state)

        # Agent performs action
        next_state, reward, done, info = env.step(action)

        # Remember
        mario.cache(state, next_state, action, reward, done)

        # Learn
        q, loss = mario.learn()

        # Update state
        state = next_state

        # Check if end of game
        if done or info["flag_get"]:
            break

    if e % 100 == 0:
        logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)
mario.save()

# Training 10.000 Episodes

In [None]:
save_dir = Path("checkpoints_10000") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)

mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)

logger = MetricLogger(save_dir)

episodes = 10000
for e in range(episodes):
    if e % 10000 == 0:
        print("Episode: " + str(e))

    state = env.reset()

    # Play the game!
    while True:

        # Run agent on the state
        action = mario.act(state)

        # Agent performs action
        next_state, reward, done, info = env.step(action)

        # Remember
        mario.cache(state, next_state, action, reward, done)

        # Learn
        q, loss = mario.learn()

        # Logging
        logger.log_step(reward, loss, q)

        # Update state
        state = next_state

        # Check if end of game
        if done or info["flag_get"]:
            break

    logger.log_episode()

    if e % 10000 == 0:
        logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)
model.save()

# Trainierte Modelle

Nachdem wir uns unseren Agenten einmal komplett ohne Training angesehen haben, so wollen wir uns einmal das Verhalten nach 100 Episoden ansehen. Vorher muss zuerst allerdings die Environment zurückgesetzt werden, dass wir die Modelle laden können. 

In [None]:
import gym
import gym_super_mario_bros
from gym.wrappers import FrameStack, GrayScaleObservation, TransformObservation
from nes_py.wrappers import JoypadSpace

from utils.agent import Mario
from utils.wrappers import ResizeObservation, SkipFrame

# Initialize Super Mario environment
env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')

# Limit the action-space to
#   0. walk right
#   1. jump right
env = JoypadSpace(
    env,
    [['right'],
    ['right', 'A']]
)

# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env, keep_dim=False)
env = ResizeObservation(env, shape=84)
env = TransformObservation(env, f=lambda x: x / 255.)
env = FrameStack(env, num_stack=4)

env.reset()

# Modell mit 100 Episoden Training

In [None]:
checkpoint = Path('models/mario_100ep.chkpt')
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir="", checkpoint=checkpoint)
mario.exploration_rate = mario.exploration_rate_min


episodes = 100

for e in range(episodes):

    state = env.reset()

    while True:

        env.render()
        time.sleep(.05)

        action = mario.act(state)

        next_state, reward, done, info = env.step(action)

        mario.cache(state, next_state, action, reward, done)


        state = next_state

        if done or info['flag_get']:
            break


# Modell mit 1000 Episoden Training

Nachdem unser Modell mit 100 Episoden noch keine großartigen Fortschritte zu verzeichnen hat, wollen wir uns mal das Modell nach circa 1000 Episoden ansehen. 

In [None]:
checkpoint = Path('models/mario_1000ep.chkpt')
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir="", checkpoint=checkpoint)

episodes = 100

for e in range(episodes):

    state = env.reset()

    while True:

        env.render()
        time.sleep(.05)

        action = mario.act(state)

        next_state, reward, done, info = env.step(action)

        mario.cache(state, next_state, action, reward, done)

        state = next_state

        if done or info['flag_get']:
            break


# Fertiges Modell

Wenn wir nun uns ein "fertiges" Modell einmal ansehen, sehen wir deutlich besseres Verhalten und höheren Reward (Erreichen des Ziels, Münzen, kürzere Zeiten etc.). Das hier gezeigte Modell wurde für mehrere zehntausend Episoden trainiert - ist aber noch keinesfalls perfekt.  

Natürlich trainieren wir hier auf sehr einfachen Levels des Spiels. Zudem haben wir zu Beginn die Aktionen nur auf zwei beschränkt. Gibt man dem Agenten mehrere Aktionen zur Auswahl und mehr Episoden im Training erhalten wir sicherlich noch ein besseres Modell. 

In [None]:
checkpoint = Path('models/mario_trained.chkpt')
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir="", checkpoint=checkpoint)

episodes = 100

for e in range(episodes):

    state = env.reset()

    while True:

        env.render()
        time.sleep(.05)

        action = mario.act(state)

        next_state, reward, done, info = env.step(action)

        mario.cache(state, next_state, action, reward, done)

        state = next_state

        if done or info['flag_get']:
            break

Das war es zum Exkurs Reinforcement Learning. Falls Sie Reinforcement Learning oder einige der Anwendungsfälle interessieren, so können Sie mit https://gym.openai.com/ sehr einfach damit starten. 

Viel Spaß beim selbst ausprobieren. 