<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/Logo_fh_hof.svg/2000px-Logo_fh_hof.svg.png" width="250" style="background-color:#FFF">

---

<div style="text-align:center;">
    <font size="+4"><i><u>Q-Learning mit 'Super Mario Bros'</u></i></font><br><br>
    <font>Seminararbeit der Vorlesung <b>Angewandtes Maschinelles Lernen</b> an der <b>Hochschule für angewande Wissenschaften Hof</b> des <b>Sommersemesters 2020</b>.</font>
</div>

---

<h1>Inhaltsverzeichnis<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Imports" data-toc-modified-id="Imports-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Imports</a></span></li><li><span><a href="#Parameter-&amp;-Hilfsunktionen" data-toc-modified-id="Parameter-&amp;-Hilfsunktionen-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Parameter &amp; Hilfsunktionen</a></span></li><li><span><a href="#SuperMarioBrosEnviorment" data-toc-modified-id="SuperMarioBrosEnviorment-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>SuperMarioBrosEnviorment</a></span></li><li><span><a href="#RewardRecord" data-toc-modified-id="RewardRecord-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>RewardRecord</a></span></li></ul></div>

---

## Imports

In [None]:
import gym
import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import RIGHT_ONLY, SIMPLE_MOVEMENT, COMPLEX_MOVEMENT

import uuid
from datetime import datetime

---

## Parameter & Hilfsunktionen

In [None]:
global _GYM_ENV_ID
global _GYM_ACTIONS
global _MONITOR_RECORD_EVERY

# Defaults:
_GYM_ENV_ID = 'SuperMarioBros-v0'
_GYM_ACTIONS = SIMPLE_MOVEMENT
_MONITOR_RECORD_EVERY = 1

print("SuperMarioBrosEnviorment-Parameter auf Default-Werte gestetz ...")

In [None]:
def should_record(current_run):
    """Gibt True zurück wenn der Run durch den Monitor aufgenommen werden soll; False wenn nicht."""
    global _MONITOR_RECORD_EVERY
    return current_run % _MONITOR_RECORD_EVERY == 0

def get_reduced_actionset():
    """Gibt die Indicies für einen stark reduzierten Actionspace zurück"""
    global _GYM_ACTIONS
    if _GYM_ACTIONS == SIMPLE_MOVEMENT: return [1, 2, 5] # { ['right'], ['right', 'A'], ['A'] }
    elif _GYM_ACTIONS == COMPLEX_MOVEMENT: return [1, 2, 5, 6] # { ['right'], ['right', 'A'], ['A'], ['left'] }
    elif _GYM_ACTIONS == RIGHT_ONLY: return [1, 2] # { ['right'], ['right', 'A'] }
    else: raise Error('Unkown Movementset \"{}\".'.format(_GYM_ACTIONS))

---

## SuperMarioBrosEnviorment

Todo-Beschreibung Enviorment

In [None]:
class SuperMarioBrosEnviorment(gym.Env):
    metadata = {'render.modes': ['human']}
    
    
    def __init__(self):
        """Init"""
        global _GYM_ENV_ID
        global _GYM_ACTIONS
        global _MONITOR_RECORD_EVERY
        
        super(SuperMarioBrosEnviorment, self).__init__()
        
        print("Initialisieren SuperMarioBrosEnviorment mit Parameter ...\n... ID: {}\n... ACTIONS: {}\n... RECORD_EVERY: {} ...".format(_GYM_ENV_ID, _GYM_ACTIONS, _MONITOR_RECORD_EVERY))
        
        # Make Env und anschließend in JoypadSpace wrappen
        env = gym.make(_GYM_ENV_ID)
        env = JoypadSpace(env, _GYM_ACTIONS)
        
        # Finaly
        self._reward_records = []
        self._reward_records_is_empty = True # für das delgieren von [-1]
        self._env = env
        self.done = False
        
        print("... abgeschlossen ...")
    
    
    def monitor(self):
        """Wrapped das Enviorment in einem Monitor, gibt den Video-Output-Ordner zurück"""
        global _RECORDS_BASE_DIR
        
        print("Wrappe Enviormenter in Monitor ...")
        
        # Pfad wird einzigartig generiert
        video_output_path = '{}/run_{}__{}'.format(_RECORDS_BASE_DIR,
                                                  datetime.now().strftime("%d-%m-%Y_%H-%M-%S"),
                                                  uuid.uuid4())
        self._env = Monitor(self._env, 
                            video_output_path,
                            video_callable = lambda episode_id: should_record((episode_id-1)), force = False)
        
        print("... Video-Output-Pfad: \"{}\" ...".format(video_output_path))
        
        return video_output_path
        
    
    # Timecritical
    def get_reward(self, info, gym_reward, done):
        """Berrechnet den Reward und gibt ihn zurück"""
        # Erzeuge einen RewardRecord und füge es der List hinzu
        try:
            rr = RewardRecord(info, self._reward_records[-1])
        except IndexError:
            if self._reward_records_is_empty:
                rr = RewardRecord(info, None)
                self._reward_records_is_empty = False
            else:
                raise Exception("Cought IndexError of RewardRecord while it should not have been empty.")
                
        self._reward_records.append(rr) 
        return rr.reward
    
    
    # Timecritical
    def step(self, action):
        """Die Step-Methode eines Enviorments, der Reward wird hier überschrieben"""
        observation, reward, done, info = self._env.step(action)
        return observation, self.get_reward(info, reward, done), done, info
    
    # Timecritical
    def reset(self):
        """Die Reset-Methode eines Enviorments"""
        # Zurücksetzten der RewardRecord's
        # Man könnte hier auch das Array abspeichern - das es aber intuitver ist wird mit dem reset 'überschrieben'
        self._reward_records = []
        self._reward_records_is_empty = True
        return self._env.reset()
    
    # Timecritical
    def render(self, mode='human'):
        """Die Render-Methode eines Enviorments"""
        return self._env.render(mode=mode)
    
    # Timecritical
    def close (self):
        """Die Close-Methode eines Enviorments"""
        return self._env.close()

---

## RewardRecord

```
+--------------+---------------------------------------------+
|   Info-Key   |                 Beschreibung                |
+--------------+---------------------------------------------+
| coins        | Anzahl der von Mario gesammelten Münzen     |
| life         | Anzahl der Leben von Mario, {2,1,0,255}     |
| score        | Kumulativer in-game Punktestand             |
| status       | Mario's Zustand {'small','tall','fireball'} |
| flag_get     | True wenn Mario die Flagge erreicht         |
| world        | Aktuele Welt {1,2,3,4,5,6,7,8}              |
| stage        | Aktuelle Etage {1,2,3,4}                    |
| time         | Aktuell übrige Zeit {400 bis 0}             |
| x_pos        | Mario's absolute X-Position                 |
| x_pos_screen | Mario's relative X-Position                 |
| y_pos        | Mario's absolute Y-Position                 |
+--------------+---------------------------------------------+
```

In [None]:
# Hilfsklasse
class RewardRecord:
    """Hilfsklasse zum bestimmen des Rewards, wird im Enviorment in eine Liste abgespeichert."""
    
    # Timecritical
    def __init__(self, info, last_rr = None):
        """init"""
        
        # Variablen aus Info
        self.coins = info['coins']
        self.life = int(info['life'])
        self.score = info['score']
        self.status = info['status']
        self.time = info['time']
        self.x_pos = info['x_pos']
        self.y_pos = info['y_pos']
        self.flag_get = info['flag_get']
        
        # Berrechne den Reward nur wenn vorher bereits ein RR vorhanden war
        if not last_rr is None:
            self.calculate_reward(last_rr)
        else:
            self.sum_reward = 0
            self.reward = 0
    
    # Timecritical
    def calculate_reward(self, last_rr):
        """Bestimmt den Reward basirenden auf den letzten RewardRecord und diesem"""
        reward = 0
        
        # Coinbonus
        reward += (self.coins - last_rr.coins) * 5
        # Lifepenality
        if not self.life == last_rr.life:
            reward += -150
        # Scorebonus
        if last_rr.score < self.score:
            reward += (self.score - last_rr.score) * .02
        # Timepenality
        timedif = (last_rr.time - self.time)
        if timedif == 1:
            reward += -10
        # Progresspenality
        if not self.x_pos == last_rr.x_pos :
            if last_rr.x_pos < self.x_pos:
                reward += 10
            else:
                reward += 5
        elif not self.y_pos == last_rr.y_pos:
            reward += 5
        else:
            reward += -20
        # clip reward {-150 <-> 150}
        if reward >= 0: reward = min(reward, 150)
        else: reward = max(reward, -150)
        # Flag reward
        if self.flag_get:
            reward += 500
            
        self.reward = reward
        self.sum_reward = last_rr.sum_reward + reward