In [1]:
import gym_super_mario_bros
import gym
import torch
import numpy as np
import imageio
import cv2
from collections import deque
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from model import ActorCritic
import matplotlib.pyplot as plt

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
FRAME_STACK_SIZE = 4
DOWNSAMPLE_RATIO = 4
MAX_STEPS = 1000
FPS = 30

# === WRAPPER ===

class DeadlockEnv(gym.Wrapper):
    def __init__(self, env, threshold=100):
        super().__init__(env)
        self.last_x_pos = 0
        self.last_y_pos = 0
        self.count = 0
        self.threshold = threshold
        self.lifes = 3
        self.stage = 1
        self.world = 1
        self.last_coins = 0
        self.max_y = float('inf')

        self.start_xpos = None

    def reset(self, **kwargs):
        self.last_x_pos = 0
        self.last_y_pos = 0
        self.count = 0
        self.start_xpos = None
        self.lifes = 3
        self.stage = 1
        self.world = 1
        self.last_coins = 0
        self.max_y = float('inf')

        return self.env.reset(**kwargs)

    def step(self, action):
        state, reward, done, info = self.env.step(action)

        x_pos = info['x_pos']
        y_pos = info['y_pos']

        if self.start_xpos is None:
            self.start_xpos = x_pos

        if x_pos <= self.last_x_pos:
            self.count += 1
        else:
            self.count = 0

        if info['life'] != self.lifes or info["stage"] != self.stage or info["world"] != self.world:
            self.last_x_pos = x_pos
            self.count = 0
            self.lifes = info['life']
            self.stage = info["stage"]
            self.world = info["world"]

        # === Coin Belohnung ===
        coins_delta = info['coins'] - self.last_coins
        reward += min(coins_delta * 0.05, 2.0)
        self.last_coins = info['coins']

        # === Flag erreicht ===
        if info.get("flag_get", False):
            reward += 50
            done = True

        # === Bewegung in X ===
        delta_x = x_pos - self.last_x_pos
        if delta_x > 0:
            reward += min(delta_x * 0.1, 5.0)
        #else:
         #   reward -= 0.01

        self.last_x_pos = x_pos

        # === Überlebensbonus ===
        if self.count % 30 == 0 and self.count != 0:
            reward += 0.5

        # === Sprungverhalten ===
        if y_pos is not None:
            if not hasattr(self, 'last_y_pos'):
                self.last_y_pos = y_pos
                self.max_y = y_pos

            delta_y = self.last_y_pos - y_pos  # kleinerer y → höher

            # Track max Sprunghöhe
            if y_pos < self.max_y:
                self.max_y = y_pos

            # Belohnung für hoch springen
            if delta_y > 0:
                reward += min(delta_y * 0.05, 1.0)  # Sanfter, nicht überdominant

            # Bonus für besonders hohe Sprünge (Pipe-Überwindung simulieren)
            #if self.max_y < 50 and delta_x > 0:
             #   reward += 2.0  # Pipe vermutlich überwunden + bewegt
            # Bonus: Wenn Mario hoch UND weit springt
            if delta_y > 10 and delta_x > 5:
                reward += min((delta_y + delta_x) * 0.02, 2.5)

            if delta_y > 5 and delta_x < 2:
                reward -= 0.05  # Strafpunkt für "Stehsprung"

            # Reset Höhe wenn gelandet
            if y_pos >= self.last_y_pos:  # wieder gefallen
                self.max_y = y_pos

            self.last_y_pos = y_pos

        # === Timeout bei Stillstand ===
        if self.count >= self.threshold:
            reward = -5
            done = True

        # Distance Tracking
        distance = x_pos - self.start_xpos
        info['distance'] = distance

        return state, reward, done, info


#skipframe wrapper
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        reward_out = 0
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            reward_out += reward
            if done:
                break
        reward_out /= max(1,i+1)

        return obs, reward_out, done, info

# === PREPROCESSING ===

def Downsample(ratio, state):
    return cv2.resize(state, (state.shape[1] // ratio, state.shape[0] // ratio))

def GrayScale(state):
    return cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)

def preprocess_frame(frame, down_sample_rate=4):
    frame = Downsample(down_sample_rate, frame)              # z. B. 240x256 → 60x64
    frame = GrayScale(frame)                                 # (H, W)
    frame = cv2.resize(frame, (84, 84), interpolation=cv2.INTER_AREA)
    return frame.astype(np.float32) / 255.0                  # Normalisiert, float32

# === ENV ===

def make_eval_env():
    frameskip = 4
    env = gym_super_mario_bros.make("SuperMarioBros-v1")
    env = JoypadSpace(env, SIMPLE_MOVEMENT)
    env = SkipFrame(env, skip=frameskip)
    env = DeadlockEnv(env, threshold=(60 * 2) // frameskip)
    return env


def save_video(frames, filename="mario_demo.mp4", fps=FPS):
    height, width, _ = frames[0].shape
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(filename, fourcc, fps, (width, height))
    for frame in frames:
        bgr_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        writer.write(bgr_frame)
    writer.release()






In [2]:
def generate_episode_video(model_path, save_path="mario_demo.mp4", max_steps=MAX_STEPS):
    env = make_eval_env()
    action_dim = env.action_space.n

    model = ActorCritic(action_dim).to(DEVICE)
    model.load_state_dict(torch.load(model_path, map_location=DEVICE))
    model.eval()

    frame_buffer = deque(maxlen=FRAME_STACK_SIZE)
    obs = env.reset()
    frame = preprocess_frame(obs)
    for _ in range(FRAME_STACK_SIZE):
        frame_buffer.append(frame)

    frames = []
    done = False
    steps = 0

    while not done and steps < max_steps:
        state = np.stack(frame_buffer, axis=0)
        state_tensor = torch.tensor(state).unsqueeze(0).float().to(DEVICE)

        with torch.no_grad():
            action, _, _ = model.act(state_tensor)

        obs, _, done, info = env.step(action.item())
        frame = preprocess_frame(obs)
        frame_buffer.append(frame)

        rgb_frame = env.render(mode="rgb_array").copy()
        frames.append(rgb_frame)

        steps += 1

    env.close()

    save_video(frames, save_path)
    imageio.mimsave("test_mario.gif", frames, duration=1/FPS)
    return save_path

In [4]:
model_path = '../../runs/A2C/04_testA2C/19_reward/a2c_epoch_0.pt'
for i in range(10):
    save_path = f'../../runs/A2C/04_testA2C/19_reward/a2c_epoch_0_{i}.mp4'
    generate_episode_video(model_path, save_path)