# Agenten und Reinforcement Learning in MiniGridworld

Dieses Notebook verbindet einfache Agenten mit Q Learning in einer gemeinsamen Umgebung. Die Beispiele bleiben kompakt und direkt ausführbar. Mini-Leitfaden siehe unten.

## Setup und Imports

In [37]:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from IPython.display import display

WIDGETS_OK = True
WIDGET_IMPORT_ERROR = None
try:
    import ipywidgets as w
except Exception as e:
    WIDGETS_OK = False
    WIDGET_IMPORT_ERROR = e
    print('ipywidgets ist nicht verfügbar. Installiere es mit: pip install ipywidgets')
    print('Fehler:', e)

## MiniGridworld und make_env
- MiniGridworld ist eine einfache 2D Testumgebung mit Feldern, Start und Ziel.
- Der Agent startet links oben und soll das Ziel rechts unten erreichen.
- Hindernisse blockieren Wege, der Agent muss einen begehbaren Pfad finden.
- Pro Schritt wählt der Agent eine Aktion: hoch, rechts, runter, links.
- Rewards entstehen durch Schrittstrafe und Zielbelohnung.
- make_env erzeugt eine konkrete Welt mit Parametern wie width, height, density, seed, step_penalty, goal_reward, max_steps.
- Dadurch sind Welten reproduzierbar und Parameteränderungen lassen sich fair vergleichen.
- Wir beobachten dafür Erfolg, Schrittzahl und Return.

In [38]:
class MiniGridworld:
    # Aktionen: 0 up, 1 right, 2 down, 3 left
    ACTIONS = {0: (0, -1), 1: (1, 0), 2: (0, 1), 3: (-1, 0)}

    def __init__(self, width=20, height=10, start=(0, 0), goal=(9, 5),
                 step_penalty=-0.02, goal_reward=1.0, max_steps=250,
                 walls=None, seed=0):
        self.width = int(width)
        self.height = int(height)
        self.start = tuple(start)
        self.goal = tuple(goal)
        self.step_penalty = float(step_penalty)
        self.goal_reward = float(goal_reward)
        self.max_steps = int(max_steps)
        self.walls = set(walls) if walls else set()
        self.seed = int(seed)

        assert self._in_bounds(self.start) and self.start not in self.walls
        assert self._in_bounds(self.goal) and self.goal not in self.walls

        self.reset()

    def _in_bounds(self, p):
        x, y = p
        return 0 <= x < self.width and 0 <= y < self.height

    def reset(self):
        self.pos = self.start
        self.t = 0
        return self.pos

    def step(self, action):
        self.t += 1
        dx, dy = self.ACTIONS[int(action)]
        nxt = (self.pos[0] + dx, self.pos[1] + dy)

        if (not self._in_bounds(nxt)) or (nxt in self.walls):
            nxt = self.pos

        self.pos = nxt
        done = False
        reward = self.step_penalty

        if self.pos == self.goal:
            reward = self.goal_reward
            done = True

        if self.t >= self.max_steps:
            done = True

        return self.pos, reward, done, {'t': self.t}

    def state_id(self, state=None):
        s = self.pos if state is None else state
        x, y = s
        return y * self.width + x

    def render(self, trail=None):
        trail = trail or set()
        lines = []
        for y in range(self.height):
            row = []
            for x in range(self.width):
                p = (x, y)
                c = '.'
                if p in self.walls:
                    c = '#'
                if p in trail:
                    c = '*'
                if p == self.goal:
                    c = 'G'
                if p == self.pos:
                    c = 'M'
                row.append(c)
            lines.append(' '.join(row))
        return '\n'.join(lines)


def generate_random_walls(width, height, start, goal, density=0.18, seed=0):
    rng = random.Random(seed)
    walls = set()
    for y in range(height):
        for x in range(width):
            p = (x, y)
            if p == start or p == goal:
                continue
            if rng.random() < density:
                walls.add(p)
    return walls


def make_env(width=20, height=10, density=0.18, seed=0,
             step_penalty=-0.02, goal_reward=1.0, max_steps=300):
    start = (0, 0)
    goal = (width - 1, height - 1)
    walls = generate_random_walls(width, height, start, goal, density=density, seed=seed)
    return MiniGridworld(width=width, height=height, start=start, goal=goal,
                         step_penalty=step_penalty, goal_reward=goal_reward,
                         max_steps=max_steps, walls=walls, seed=seed)


env_demo = make_env(width=12, height=8, density=0.20, seed=3)
print(env_demo.render())

M . . . . . # # . . . .
. . . . # . . . . . # .
. . # . . . . . . . . .
. . # # . . . . . . . .
. . . . . . . . # . . .
. . . . . . # . . # . .
# . . . # . # . . . . .
. . # . # # . . # # . G


## Baseline Agenten und kurze Evaluation
- Random: wählt in jedem Schritt eine zufällige Aktion. Dient als untere Referenz ohne Zielorientierung.
- Greedy: wählt die Aktion, die die Manhattan-Distanz zum Ziel am stärksten reduziert. Kein Lernen, kann in Sackgassen oder Schleifen hängen bleiben.
- Greedy mit Memory: wie Greedy, aber meidet bereits besuchte Felder, um Wiederholungen zu reduzieren. Kein Lernen, aber oft stabiler als Greedy.

In [39]:
ACTIONS = [0, 1, 2, 3]

def manhattan(s, g):
    return abs(s[0] - g[0]) + abs(s[1] - g[1])


def choose_action_random(state, env):
    return random.choice(ACTIONS)


def choose_action_greedy(state, env):
    x, y = state
    gx, gy = env.goal
    preferred = []
    if gy < y:
        preferred.append(0)
    if gx > x:
        preferred.append(1)
    if gy > y:
        preferred.append(2)
    if gx < x:
        preferred.append(3)
    return random.choice(preferred) if preferred else random.choice(ACTIONS)


def choose_action_greedy_memory(state, env, memory):
    # Vermeidet kurze Schleifen, indem zuletzt besuchte Zustände gemieden werden
    candidates = []
    for a in ACTIONS:
        old_pos, old_t = env.pos, env.t
        env.pos, env.t = state, 0
        sp, _, _, _ = env.step(a)
        env.pos, env.t = old_pos, old_t
        candidates.append((a, manhattan(sp, env.goal), sp in memory))

    candidates.sort(key=lambda x: (x[2], x[1]))
    return int(candidates[0][0])


def run_baseline_episode(env, mode='random'):
    s = env.reset()
    total = 0.0
    memory = [s]

    for t in range(env.max_steps):
        if mode == 'random':
            a = choose_action_random(s, env)
        elif mode == 'greedy':
            a = choose_action_greedy(s, env)
        elif mode == 'greedy_memory':
            a = choose_action_greedy_memory(s, env, set(memory[-10:]))
        else:
            raise ValueError('Unbekannter mode')

        s, r, done, info = env.step(a)
        total += r
        memory.append(s)
        if done:
            return total, info.get('t', t + 1), int(s == env.goal)

    return total, env.max_steps, 0


def evaluate_baseline_modes(width=12, height=8, density=0.20, seed=3,
                            step_penalty=-0.02, goal_reward=1.0, max_steps=250,
                            episodes=50):
    rows = []
    for mode in ['random', 'greedy', 'greedy_memory']:
        returns = []
        steps = []
        succ = 0
        for _ in range(episodes):
            env = make_env(width, height, density, seed, step_penalty, goal_reward, max_steps)
            ret, st, ok = run_baseline_episode(env, mode=mode)
            returns.append(ret)
            steps.append(st)
            succ += ok

        rows.append({
            'mode': mode,
            'success_rate': succ / episodes,
            'mean_steps': float(np.mean(steps)),
            'mean_return': float(np.mean(returns))
        })

    return pd.DataFrame(rows).sort_values(['success_rate', 'mean_return'], ascending=[False, False])


display(evaluate_baseline_modes())

Unnamed: 0,mode,success_rate,mean_steps,mean_return
2,greedy_memory,1.0,18.0,0.66
1,greedy,0.36,169.18,-3.0164
0,random,0.16,234.0,-4.5168


### Ergebnis der Baseline-Evaluation
- Die Tabelle vergleicht Baselines über success_rate, mean_steps und return_mean.
- greedy_memory erreicht das Ziel oft zuverlässig und braucht meist weniger Schritte, daher ist der Return oft höher.
- greedy ist zielgerichtet, kann aber in Schleifen oder Sackgassen geraten, dadurch steigen Schritte und der Return sinkt.
- random hat keine Zielstrategie und dient als Referenz für schlechtes, aber erwartbares Verhalten.

### Greedy mit Zufall und optional Memory
- Greedy wählt meist den Schritt, der die Distanz zum Ziel reduziert.
- p_random mischt Zufall ein und hilft, aus schlechten Routinen oder Sackgassen auszubrechen.
- Memory vermeidet nach Möglichkeit bereits besuchte Felder und reduziert Wiederholungen.
- Zusammen ergibt das eine gut spielbare Baseline ohne Lernen.
- Die Wirkung von Zufall, Hindernisdichte und Schrittstrafe wird direkt sichtbar.

In [40]:
# Interaktive Baseline-Demo: Greedy mit Zufall und Memory gegen Festlaufen
# Spielbar: Parameter ändern, Episode laufen lassen, Verhalten direkt beobachten.

try:
    import ipywidgets as w
except Exception as e:
    raise RuntimeError(
        "ipywidgets ist nicht verfügbar. Installiere es mit pip install ipywidgets. Fehler: %s" % e
    )

import random
import time
from IPython.display import display


def manhattan(a, b):
    return abs(a[0] - b[0]) + abs(a[1] - b[1])


def generate_world_with_path(width, height, density=0.18, seed=0):
    """
    Erzeugt eine Welt mit garantierter Verbindung zwischen Start und Ziel.
    Erst wird ein einfacher Pfad gebaut, danach werden Wände zufällig gesetzt,
    aber nicht auf dem Pfad.
    """
    rng = random.Random(seed)
    start = (0, 0)
    goal = (width - 1, height - 1)

    # Pfad: rechts/runter mit etwas Variation
    path = {start}
    x, y = start
    while (x, y) != goal:
        moves = []
        if x < goal[0]:
            moves.append((x + 1, y))
        if y < goal[1]:
            moves.append((x, y + 1))
        nxt = rng.choice(moves)
        x, y = nxt
        path.add((x, y))

    walls = set()
    for yy in range(height):
        for xx in range(width):
            p = (xx, yy)
            if p in path:
                continue
            if p == start or p == goal:
                continue
            if rng.random() < density:
                walls.add(p)

    return start, goal, walls


def render_grid(pos, goal, width, height, walls=None, trail=None):
    walls = walls or set()
    trail = trail or set()
    px, py = pos
    gx, gy = goal

    lines = []
    for y in range(height):
        row = []
        for x in range(width):
            cell = "."
            if (x, y) in walls:
                cell = "#"
            if (x, y) in trail:
                cell = "*"
            if (x, y) == (gx, gy):
                cell = "G"
            if (x, y) == (px, py):
                cell = "M"
            row.append(cell)
        lines.append(" ".join(row))
    return "\n".join(lines)


def _next_pos(pos, action, width, height, walls):
    moves = {0: (0, -1), 1: (1, 0), 2: (0, 1), 3: (-1, 0)}  # up, right, down, left
    dx, dy = moves[int(action)]
    nxt = (pos[0] + dx, pos[1] + dy)
    if not (0 <= nxt[0] < width and 0 <= nxt[1] < height):
        return pos
    if nxt in walls:
        return pos
    return nxt


def _valid_actions(pos, width, height, walls):
    acts = []
    for a in [0, 1, 2, 3]:
        if _next_pos(pos, a, width, height, walls) != pos:
            acts.append(a)
    return acts


def _greedy_action_memory(pos, goal, width, height, walls, visited, explore_if_stuck=True):
    """
    Greedy Richtung Ziel, aber bevorzugt unbesuchte Felder.
    Wenn alle besten Schritte in besuchte Felder führen, kann er optional explorieren.
    """
    best_d = 10**9
    best_actions = []

    for a in [0, 1, 2, 3]:
        sp = _next_pos(pos, a, width, height, walls)
        d = manhattan(sp, goal)
        if d < best_d:
            best_d = d
            best_actions = [a]
        elif d == best_d:
            best_actions.append(a)

    # unter den besten: unbesuchte bevorzugen
    unvisited = [a for a in best_actions if _next_pos(pos, a, width, height, walls) not in visited]
    if unvisited:
        return random.choice(unvisited)

    # wenn festgefahren: explorieren mit einem gültigen Schritt
    if explore_if_stuck:
        va = _valid_actions(pos, width, height, walls)
        if va:
            return random.choice(va)

    return random.choice(best_actions) if best_actions else 0


def run_grid_visual(
    out,
    width=20,
    height=10,
    density=0.18,
    seed=0,
    p_random=0.20,
    penalty=-0.02,
    goal_reward=1.0,
    delay=0.04,
    show_trail=True,
    max_steps=400,
    use_memory=True,
):
    start, goal, walls = generate_world_with_path(width, height, density=density, seed=seed)
    pos = start
    total = 0.0
    trail = set([pos])
    visited = set([pos])

    with out:
        out.clear_output(wait=True)

        for t in range(max_steps):
            if show_trail:
                trail.add(pos)

            out.clear_output(wait=True)
            print(
                f"t={t:03d}  pos={pos}  goal={goal}  return={total:.2f}  "
                f"walls={len(walls)}  p_random={float(p_random):.2f}  memory={use_memory}"
            )
            print(
                render_grid(
                    pos=pos,
                    goal=goal,
                    width=width,
                    height=height,
                    walls=walls,
                    trail=trail if show_trail else None,
                )
            )

            # Aktion wählen: entweder zufällig (Exploration), oder greedy (mit oder ohne Memory)
            if random.random() < float(p_random):
                va = _valid_actions(pos, width, height, walls)
                action = random.choice(va) if va else random.choice([0, 1, 2, 3])
            else:
                if use_memory:
                    action = _greedy_action_memory(pos, goal, width, height, walls, visited)
                else:
                    # reiner greedy fallback (ohne Memory)
                    best_a = 0
                    best_d = 10**9
                    for a in [0, 1, 2, 3]:
                        sp = _next_pos(pos, a, width, height, walls)
                        d = manhattan(sp, goal)
                        if d < best_d:
                            best_d = d
                            best_a = a
                    action = best_a

            pos = _next_pos(pos, action, width, height, walls)
            visited.add(pos)

            # Reward
            if pos == goal:
                total += float(goal_reward)
                out.clear_output(wait=True)
                print(
                    f"steps={t+1}  return={total:.2f}  reached_goal=True  walls={len(walls)}  memory={use_memory}"
                )
                print(
                    render_grid(
                        pos=pos,
                        goal=goal,
                        width=width,
                        height=height,
                        walls=walls,
                        trail=trail if show_trail else None,
                    )
                )
                break
            else:
                total += float(penalty)

            # Abbruch, wenn max_steps erreicht
            if (t + 1) >= max_steps:
                out.clear_output(wait=True)
                print(
                    f"steps={t+1}  return={total:.2f}  reached_goal={pos==goal}  walls={len(walls)}  memory={use_memory}"
                )
                print(
                    render_grid(
                        pos=pos,
                        goal=goal,
                        width=width,
                        height=height,
                        walls=walls,
                        trail=trail if show_trail else None,
                    )
                )
                break

            time.sleep(float(delay))


# UI
btn = w.Button(description="Run episode", button_style="primary")
W = w.IntSlider(value=20, min=10, max=40, step=1, description="width")
H = w.IntSlider(value=10, min=5, max=20, step=1, description="height")
D = w.FloatSlider(value=0.18, min=0.00, max=0.40, step=0.02, description="density")
S = w.IntSlider(value=0, min=0, max=99, step=1, description="seed")

PR = w.FloatSlider(value=0.20, min=0.00, max=1.00, step=0.05, description="p_random")
PE = w.FloatSlider(value=-0.02, min=-0.30, max=0.00, step=0.01, description="penalty")
GR = w.FloatSlider(value=1.0, min=0.2, max=2.0, step=0.1, description="goal_reward")

DL = w.FloatSlider(value=0.04, min=0.01, max=0.20, step=0.01, description="delay")
TR = w.Checkbox(value=True, description="trail")
MEM = w.Checkbox(value=True, description="memory")

out = w.Output()

display(
    w.VBox(
        [
            w.HBox([btn]),
            w.HBox([W, H, D, S]),
            w.HBox([PR, PE, GR, DL, TR, MEM]),
            out,
        ]
    )
)


def _on_click(_):
    run_grid_visual(
        out,
        width=int(W.value),
        height=int(H.value),
        density=float(D.value),
        seed=int(S.value),
        p_random=float(PR.value),
        penalty=float(PE.value),
        goal_reward=float(GR.value),
        delay=float(DL.value),
        show_trail=bool(TR.value),
        max_steps=400,
        use_memory=bool(MEM.value),
    )


btn.on_click(_on_click)

VBox(children=(HBox(children=(Button(button_style='primary', description='Run episode', style=ButtonStyle()),)…

Interpretation:
- Die Baseline folgt einer festen Entscheidungsregel und kann optional durch Zufall ergänzt werden, sie passt ihr Verhalten nicht durch Lernen an.
- Effizienz zeigt sich vor allem in der Schrittzahl, da zusätzliche Schritte bei Schrittstrafen den Return typischerweise senken.
- Zufall erhöht die Chance, aus ungünstigen Situationen herauszukommen, kann aber gleichzeitig die Strecke verlängern und die Ergebnisse stärker streuen.
- Memory reduziert Wiederholungen, indem bereits besuchte Zustände nach Möglichkeit gemieden werden, was Schleifen seltener macht.

Danach übertragen wir dieselben Begriffe auf die 2D MiniGridworld und auf Q Learning.


## Markov Eigenschaft und MDP
- **Markov Eigenschaft:** Die Zukunft hängt nur vom aktuellen Zustand und der gewählten Aktion ab, nicht von der gesamten Vergangenheit.
- **MDP Bestandteile:** Zustände, Aktionen, Übergänge, Rewards, Diskontfaktor.
- **Wichtiger Punkt:** Wenn relevante Information aus der Vergangenheit fehlt, ist der Zustand unvollständig und das Problem ist praktisch nicht markovisch.
- **Lösung:** Zustand erweitern, bis alle entscheidungsrelevanten Informationen enthalten sind, zum Beispiel durch ein Memory-Flag.


In [29]:
# Markov Demo mit Zustands Erweiterung durch ein Schaltermerkmal
class SwitchWorld:
    ACTIONS = {0: (0, -1), 1: (1, 0), 2: (0, 1), 3: (-1, 0)}

    def __init__(self, width=6, height=4, start=(0, 0), goal=(5, 3), switch=(2, 1)):
        self.width, self.height = width, height
        self.start, self.goal, self.switch = start, goal, switch
        self.reset()

    def reset(self):
        self.pos = self.start
        self.switch_on = False
        return (self.pos, self.switch_on)

    def step(self, action):
        dx, dy = self.ACTIONS[int(action)]
        x, y = self.pos
        nx = max(0, min(self.width - 1, x + dx))
        ny = max(0, min(self.height - 1, y + dy))
        self.pos = (nx, ny)

        if self.pos == self.switch:
            self.switch_on = True

        done = self.pos == self.goal
        if done:
            reward = 1.0 if self.switch_on else -1.0
        else:
            reward = -0.01

        return (self.pos, self.switch_on), reward, done


def run_path(actions):
    env = SwitchWorld()
    obs = env.reset()
    total = 0.0
    for a in actions:
        obs, r, done = env.step(a)
        total += r
        if done:
            break
    return obs, total


obs_a, ret_a = run_path([1, 1, 1, 1, 1, 2, 2, 2])
obs_b, ret_b = run_path([1, 1, 2, 1, 1, 1, 2, 2])
print('Ende A:', obs_a, 'Return:', round(ret_a, 3))
print('Ende B:', obs_b, 'Return:', round(ret_b, 3))
print('Bei gleicher Position kann der Reward verschieden sein, wenn Gedächtnis fehlt.')

Ende A: ((5, 3), False) Return: -1.07
Ende B: ((5, 3), True) Return: 0.93
Bei gleicher Position kann der Reward verschieden sein, wenn Gedächtnis fehlt.


### Interpretation der Ausgabe (SwitchWorld)
- **Gleiche Position, unterschiedliche Bewertung:** Ein Endzustand kann an derselben Position liegen, aber unterschiedliche Returns haben.
- **Grund:** Der Zustand umfasst mehr als Koordinaten, zusätzlich zählt Kontext oder Gedächtnis.
- **Konsequenz:** Ein reiner Positionszustand kann falsche Schlüsse erzwingen, weil die Aufgabe implizit Historie benötigt.
- **Merksatz:** Markov wird eine Aufgabe erst, wenn der Zustand alle relevanten Informationen enthält.

## Q Learning mit Epsilon Decay

### Q Learning Update Regel

Q Learning speichert für jeden Zustand und jede mögliche Aktion einen Q Wert.
Der Q Wert steht für die erwartete Qualität einer Aktion, also wie gut sie sich langfristig auszahlt.

Die zentrale Aktualisierung lautet:

Q(s,a) = Q(s,a) + alpha * (r + gamma * max_a' Q(s',a') - Q(s,a))

So kann man die Formel lesen:
- Der Ausdruck in Klammern ist das Lernsignal. Er vergleicht eine neue Schätzung mit dem bisherigen Q Wert.
- r ist der sofortige Reward nach der Aktion.
- max_a' Q(s',a') ist die beste erwartete Fortsetzung aus dem Folgezustand s'.
- Der Term (r + gamma * max_a' Q(s',a')) ist das Ziel, auf das Q(s,a) hin bewegt wird.
- Der Teil (r + gamma * max_a' Q(s',a') - Q(s,a)) ist der Fehler der aktuellen Schätzung, auch TD Error genannt.

Begriffe:
- alpha: Lernrate. Hoch bedeutet schnelle Anpassung, niedrig bedeutet langsamer, aber oft stabiler.
- gamma: Weitblick. Hoch bedeutet, zukünftige Rewards zählen stark. Niedrig bedeutet, kurzfristige Rewards dominieren.
- eps: Exploration. Mit einer Wahrscheinlichkeit eps probiert der Agent zufällige Aktionen aus, statt immer die aktuell beste zu wählen.

In [30]:
# Mini Demo: eine einzelne Q Aktualisierung
q_sa = 0.10
r = -0.02
gamma = 0.95
max_q_next = 0.40
alpha = 0.50

td_target = r + gamma * max_q_next
td_error = td_target - q_sa
q_new = q_sa + alpha * td_error

print('Q alt:', round(q_sa, 4))
print('TD Target:', round(td_target, 4))
print('TD Error:', round(td_error, 4))
print('Q neu:', round(q_new, 4))


Q alt: 0.1
TD Target: 0.36
TD Error: 0.26
Q neu: 0.23



### Interpretation des Q Updates
- **Q_old:** bisherige Schätzung für die Qualität einer Aktion in einem Zustand.
- **td_target:** neue Zielschätzung aus aktuellem Reward und bester erwarteter Fortsetzung.
- **td_error:** Differenz zwischen neuer Zielschätzung und alter Schätzung.
- **Update:** Q wird um einen Anteil von td_error verschoben, gesteuert durch alpha.
- **Intuition:** Große Fehler führen zu größeren Korrekturen, kleine Fehler zu kleinen Korrekturen.

In [31]:
# Formel in Aktion: mehrere Q Updates als kurzer Prozess
alpha = 0.50
gamma = 0.95

# Beispielzustand mit vier Aktionswerten
Q_state = np.array([0.10, 0.05, -0.02, 0.00], dtype=float)

# Feste Updates: (step, s, a, r, maxQ_next)
updates = [
    (1, 0, 0, -0.02, 0.40),
    (2, 0, 0, -0.02, 0.35),
    (3, 0, 0,  0.10, 0.30),
    (4, 0, 0, -0.02, 0.45),
    (5, 0, 0,  0.20, 0.50),
]

rows = []
for step, s, a, r, max_q_next in updates:
    q_old = float(Q_state[a])
    td_target = float(r + gamma * max_q_next)
    td_error = float(td_target - q_old)
    q_new = float(q_old + alpha * td_error)
    Q_state[a] = q_new

    rows.append({
        'step': step,
        'Q_old': round(q_old, 4),
        'td_target': round(td_target, 4),
        'td_error': round(td_error, 4),
        'Q_new': round(q_new, 4),
    })

df_updates = pd.DataFrame(rows, columns=['step', 'Q_old', 'td_target', 'td_error', 'Q_new'])
print(df_updates.to_string(index=False))


 step  Q_old  td_target  td_error  Q_new
    1 0.1000     0.3600    0.2600 0.2300
    2 0.2300     0.3125    0.0825 0.2712
    3 0.2712     0.3850    0.1138 0.3281
    4 0.3281     0.4075    0.0794 0.3678
    5 0.3678     0.6750    0.3072 0.5214


### Interpretation der Update-Tabelle
- **Wiederholtes Update:** Derselbe Q Wert nähert sich Schritt für Schritt der jeweiligen Zielschätzung an.
- **Vorzeichen:** Wenn td_target größer ist als Q_old, steigt Q. Wenn td_target kleiner ist, sinkt Q.
- **Dynamik:** Sprünge im td_target erzeugen größere Updates, weil neue Erfahrung die Erwartung verändert.
- **Merksatz:** Lernen verläuft nicht glatt, sondern reagiert auf neue Rewards und neue Fortsetzungen.

### Policy aus der Q Tabelle
- **Q Tabelle:** enthält pro Zustand und Aktion eine gelernte Qualitätsabschätzung.
- **Policy:** Entscheidungsvorschrift, welche Aktion im Zustand gewählt wird.
- **Hier:** Auswahl der Aktion mit dem höchsten Q Wert, also maximale erwartete Qualität.

In [32]:
def policy_from_Q(Q):
    return np.argmax(Q, axis=1)


def evaluate_with_returns(env, policy, episodes=50):
    rets = []
    steps = []
    succ = 0

    for _ in range(episodes):
        s = env.reset()
        sid = env.state_id(s)
        total = 0.0

        for t in range(env.max_steps):
            a = int(policy[sid])
            s, r, done, info = env.step(a)
            total += r
            sid = env.state_id(s)
            if done:
                succ += int(s == env.goal)
                steps.append(info.get('t', t + 1))
                break
        else:
            steps.append(env.max_steps)

        rets.append(total)

    return np.array(rets), np.array(steps), succ / episodes


def q_learning_train_decay(env, episodes=900, alpha=0.5, gamma=0.95,
                           eps_start=0.35, eps_end=0.05):
    n_states = env.width * env.height
    Q = np.zeros((n_states, 4), dtype=float)
    returns = []

    for ep in range(episodes):
        eps = eps_start + (eps_end - eps_start) * (ep / max(1, episodes - 1))
        s = env.reset()
        sid = env.state_id(s)
        total = 0.0

        for _ in range(env.max_steps):
            if np.random.rand() < eps:
                a = np.random.randint(0, 4)
            else:
                a = int(np.argmax(Q[sid]))

            sp, r, done, _ = env.step(a)
            spid = env.state_id(sp)
            td_target = r + gamma * np.max(Q[spid])
            Q[sid, a] += alpha * (td_target - Q[sid, a])

            total += r
            sid = spid
            if done:
                break

        returns.append(total)

    return Q, returns


def loss_function(success_rate, mean_steps, max_steps, returns, w1=1.0, w2=0.3, w3=0.1):
    # Kombiniert Erfolgsrate, Effizienz und Varianz zu einem kompakten Vergleichswert
    return (
        w1 * (1.0 - float(success_rate)) +
        w2 * (float(mean_steps) / float(max_steps)) +
        w3 * float(np.var(returns))
    )


## Interaktiv trainieren und still ausführen
- **Ziel:** Q Learning mit veränderbaren Parametern trainieren und das Ergebnis direkt prüfen.
- **Train:** lernt eine Policy aus vielen Episoden und zeigt eine kompakte Auswertung.
- **Auswertung:** Erfolg, Schrittzahl und Return als schnelle Qualitätsindikatoren.
- **Stiller Run:** testet die gelernte Policy in genau einer Episode.
- **Optional:** finaler ASCII Render zeigt den gefundenen Weg und mögliche Umwege.

## Wie die Parameter das Lernen beeinflussen

- alpha: Lernrate. Hoch bedeutet schnelle Anpassung der Q Werte, kann aber schwanken. Niedrig bedeutet langsameres, dafür oft stabileres Lernen.
- gamma: Weitblick. Hoch bedeutet, dass zukünftige Belohnungen stark zählen. Niedrig bedeutet, dass kurzfristige Rewards dominieren.
- eps_start: Startwert für Exploration. Hoch bedeutet mehr zufälliges Ausprobieren. Niedrig bedeutet mehr Exploitation, also schnelleres Nutzen der aktuell besten Aktion.
- Nach einer Loss Suche werden Werte nur in die Regler übernommen. Drücke danach erneut Train, damit mit diesen Parametern wirklich neu gelernt wird.

In [35]:
try:
    import ipywidgets as w
except Exception as e:
    raise RuntimeError('ipywidgets ist nicht verfügbar. Installiere es mit pip install ipywidgets. Fehler: %s' % e)

train_btn = w.Button(description='Train', button_style='primary')
quick_btn = w.Button(description='Quick Compare', button_style='info')

W = w.IntSlider(value=20, min=10, max=40, step=1, description='width')
H = w.IntSlider(value=10, min=5, max=30, step=1, description='height')
D = w.FloatSlider(value=0.20, min=0.00, max=0.45, step=0.02, description='density')
S = w.IntSlider(value=3, min=0, max=99, step=1, description='seed')
SP = w.FloatSlider(value=-0.02, min=-0.30, max=0.00, step=0.01, description='step_penalty')
GR = w.FloatSlider(value=1.0, min=0.2, max=2.0, step=0.1, description='goal_reward')
MS = w.IntSlider(value=250, min=80, max=900, step=20, description='max_steps')

EP = w.IntSlider(value=600, min=200, max=1000, step=100, description='episodes')
AL = w.FloatSlider(value=0.50, min=0.05, max=1.0, step=0.05, description='alpha')
GA = w.FloatSlider(value=0.95, min=0.10, max=0.99, step=0.05, description='gamma')
EI = w.FloatSlider(value=0.35, min=0.00, max=1.0, step=0.05, description='eps_start')
EE = w.FloatSlider(value=0.05, min=0.00, max=0.20, step=0.01, description='eps_end')
SHOW_TRAIL = w.Checkbox(value=False, description='trail')

out_train = w.Output()
out_compare = w.Output()

display(w.VBox([
    w.HBox([train_btn, quick_btn]),
    w.HBox([W, H, D, S]),
    w.HBox([SP, GR, MS]),
    w.HBox([EP, AL, GA, EI, EE, SHOW_TRAIL]),
    out_train,
    out_compare,
]))

last_policy = None
last_env = None
last_train_returns = None
last_train_params = None

EVAL_EPISODES_FIXED = 40

def _build_env_from_slider():
    return make_env(
        width=int(W.value),
        height=int(H.value),
        density=float(D.value),
        seed=int(S.value),
        step_penalty=float(SP.value),
        goal_reward=float(GR.value),
        max_steps=int(MS.value),
    )

def _run_episode_silent(env, policy, show_trail=False):
    s = env.reset()
    sid = env.state_id(s)
    total = 0.0
    trail = {s}

    for t in range(env.max_steps):
        a = int(policy[sid])
        s, r, done, info = env.step(a)
        sid = env.state_id(s)
        total += r
        if show_trail:
            trail.add(s)
        if done:
            summary = {
                'steps': int(info.get('t', t + 1)),
                'return': float(total),
                'reached_goal': bool(s == env.goal),
            }
            ascii_final = env.render(trail=trail if show_trail else None)
            return summary, ascii_final

    summary = {'steps': int(env.max_steps), 'return': float(total), 'reached_goal': bool(s == env.goal)}
    ascii_final = env.render(trail=trail if show_trail else None)
    return summary, ascii_final

def _train_once(eps_start_value, gamma_value, episodes_value, eval_episodes_value):
    env_train = _build_env_from_slider()
    Q, train_returns = q_learning_train_decay(
        env_train,
        episodes=int(episodes_value),
        alpha=float(AL.value),
        gamma=float(gamma_value),
        eps_start=float(eps_start_value),
        eps_end=float(EE.value),
    )
    policy = policy_from_Q(Q)

    env_eval = _build_env_from_slider()
    rets, steps, succ = evaluate_with_returns(env_eval, policy, episodes=int(eval_episodes_value))
    metrics = {
        'success_rate': float(succ),
        'mean_steps': float(np.mean(steps)),
        'return_mean': float(np.mean(rets)),
        'var_return': float(np.var(rets)),
        'loss': float(loss_function(float(succ), float(np.mean(steps)), int(MS.value), rets)),
    }
    return policy, train_returns, metrics

def _tip(success_rate):
    if success_rate < 0.3:
        return 'Tipp: erhöhe eps_start oder episodes, oder reduziere density.'
    if success_rate <= 0.7:
        return 'Tipp: erhöhe episodes oder senke eps_start leicht, um stabiler zu werden.'
    return 'Tipp: senke eps_start, um weniger zufällig zu laufen, und prüfe mean_steps.'

def on_train(_):
    global last_policy, last_env, last_train_returns, last_train_params

    episodes_train = int(EP.value)
    episodes_eval = int(EVAL_EPISODES_FIXED)
    policy, train_returns, m = _train_once(EI.value, GA.value, episodes_train, episodes_eval)

    last_policy = policy
    last_env = _build_env_from_slider()
    last_train_returns = train_returns
    last_train_params = {
        'width': int(W.value), 'height': int(H.value), 'density': float(D.value), 'seed': int(S.value),
        'step_penalty': float(SP.value), 'goal_reward': float(GR.value), 'max_steps': int(MS.value),
    }

    env_label = f"{int(W.value)}x{int(H.value)}, d={float(D.value):.2f}, s={int(S.value)}"
    row = {
        'success_rate': m['success_rate'],
        'mean_steps': m['mean_steps'],
        'return_mean': m['return_mean'],
        'var_return': m['var_return'],
        'alpha': float(AL.value),
        'gamma': float(GA.value),
        'eps_start': float(EI.value),
        'episodes': episodes_train,
        'env': env_label,
    }

    with out_train:
        out_train.clear_output(wait=True)
        df = pd.DataFrame([row], columns=[
            'success_rate', 'mean_steps', 'return_mean', 'var_return',
            'alpha', 'gamma', 'eps_start', 'episodes', 'env'
        ])
        display(df)
        print(_tip(m['success_rate']))
        plt.figure(figsize=(4.2, 2.2))
        plt.plot(train_returns)
        plt.xlabel('ep')
        plt.ylabel('ret')
        plt.tight_layout()
        plt.show()

def on_quick_compare(_):
    eps_values = [0.05, 0.25, 0.50]
    gamma_values = [0.20, 0.60, 0.95]
    episodes_train = min(int(EP.value), 700)
    episodes_eval = min(int(EVAL_EPISODES_FIXED), 40)

    rows = []
    for e0 in eps_values:
        _, _, m = _train_once(e0, GA.value, episodes_train, episodes_eval)
        rows.append({
            'mode': 'eps_start',
            'value': e0,
            'success_rate': m['success_rate'],
            'mean_steps': m['mean_steps'],
            'loss': m['loss'],
        })

    for g0 in gamma_values:
        _, _, m = _train_once(EI.value, g0, episodes_train, episodes_eval)
        rows.append({
            'mode': 'gamma',
            'value': g0,
            'success_rate': m['success_rate'],
            'mean_steps': m['mean_steps'],
            'loss': m['loss'],
        })

    with out_compare:
        out_compare.clear_output(wait=True)
        display(pd.DataFrame(rows).sort_values('loss', ascending=True).reset_index(drop=True))

train_btn.on_click(on_train)
quick_btn.on_click(on_quick_compare)


VBox(children=(HBox(children=(Button(button_style='primary', description='Train', style=ButtonStyle()), Button…

### Interpretation des Outputs
- Der Return steigt im Training und stabilisiert sich, das spricht für eine bessere Policy mit weniger unnötigen Schritten.
- eps_start hat hier nur geringen Einfluss, weil die Aufgabe unter diesen Einstellungen bereits leicht lösbar ist.
- gamma wirkt stärker, bei zu kleinem gamma sinkt die Leistung, weil zukünftige Belohnungen zu wenig zählen.
- Der Loss fasst Erfolg, Effizienz und Varianz zusammen, kleiner ist besser.

### Return kurz erklärt
- Return ist die Summe aller Rewards innerhalb einer Episode.
- Im Trainingsplot ist der Return pro Episode über die Zeit dargestellt.
- Eine Schrittstrafe senkt den Return mit jedem zusätzlichen Schritt, auch bei Zielerreichung.
- Weniger Schritte bedeutet meist höheren Return, weil weniger Strafen anfallen und das Ziel schneller erreicht wird.

In [34]:
try:
    import ipywidgets as w
except Exception as e:
    raise RuntimeError('ipywidgets ist nicht verfügbar. Installiere es mit pip install ipywidgets. Fehler: %s' % e)

run_btn = w.Button(description='Run still', button_style='success')
show_render = w.Checkbox(value=False, description='show render')
out_run = w.Output()

display(w.VBox([w.HBox([run_btn, show_render]), out_run]))

def _run_policy_once(env, policy, render_ascii=False):
    s = env.reset()
    sid = env.state_id(s)
    total = 0.0
    trail = {s}

    for t in range(env.max_steps):
        a = int(policy[sid])
        s, r, done, info = env.step(a)
        sid = env.state_id(s)
        total += r
        trail.add(s)
        if done:
            summary = {
                'steps': int(info.get('t', t + 1)),
                'return': float(total),
                'reached_goal': bool(s == env.goal),
            }
            final_ascii = env.render(trail=trail) if render_ascii else None
            return summary, final_ascii

    summary = {'steps': int(env.max_steps), 'return': float(total), 'reached_goal': bool(s == env.goal)}
    final_ascii = env.render(trail=trail) if render_ascii else None
    return summary, final_ascii

def on_run(_):
    with out_run:
        out_run.clear_output(wait=True)

        if 'last_policy' not in globals() or last_policy is None:
            print('Bitte zuerst trainieren.')
            return

        if 'last_train_params' in globals() and last_train_params is not None:
            env = make_env(**last_train_params)
        else:
            env = make_env()

        summary, ascii_final = _run_policy_once(env, last_policy, render_ascii=bool(show_render.value))
        print('steps:', summary['steps'])
        print('return:', round(summary['return'], 3))
        print('reached_goal:', summary['reached_goal'])
        if ascii_final is not None:
            print(ascii_final)

run_btn.on_click(on_run)


VBox(children=(HBox(children=(Button(button_style='success', description='Run still', style=ButtonStyle()), Ch…

### Wie der Agent das Ziel nun optimiert erreicht
- Der Agent nutzt eine gelernte Policy, die pro Zustand eine bevorzugte Aktion vorgibt.
- Im Training werden Handlungen mit hohem langfristigem Return verstärkt, ungünstige Wege werden schrittweise entwertet.
- Q Werte bilden eine Orientierung im Zustandsraum, gute Routen haben höhere Q Werte als Sackgassen oder Umwege.
- Bei Schrittstrafen begünstigt die Policy kürzere Wege, dadurch sinken Schritte und der Return steigt.
- Im finalen Lauf dominiert Exploitation, der Agent folgt stabil einer konsistenten Route statt stark zu variieren.

## Mini Übungen

1. Vergleiche in der Baseline Tabelle greedy und greedy_memory bei hoher Dichte.
2. Setze im Training eps_start auf 0.0 und auf 0.5 und vergleiche die Erfolgsrate.
3. Reduziere gamma auf 0.2 und erhöhe gamma auf 0.95. Beschreibe den Unterschied.
4. Erhöhe die Schrittstrafe von minus 0.02 auf minus 0.08 und beobachte den Return.
5. Nutze Loss Tuning und prüfe, ob die übernommenen Werte die Metriken verbessern.

## Mini Leitfaden: Q Learning live ausprobieren (7 bis 10 Minuten)

### 1) Start: Welt und Baselines
1. Führe die Zellen bis zur Ausgabe der Baseline Agenten aus.
2. Merke dir grob: Wie oft erreicht greedy das Ziel, wie viele Schritte braucht es im Mittel.

### 2) Q Learning einmal trainieren
1. Gehe zur interaktiven Sektion und drücke Train.
2. Lies die Tabelle: success rate, mean steps und return.
3. Führe danach den stillen Run aus und prüfe: reached goal, steps und return.

### 3) Exploration verstehen (eps start)
1. Setze eps start sehr niedrig, zum Beispiel 0.05, und trainiere erneut.
2. Setze eps start höher, zum Beispiel 0.50, und trainiere erneut.
3. Vergleiche success rate und mean steps.
Merksatz: Hohe Exploration bedeutet mehr Ausprobieren. Das kann helfen, kostet aber oft mehr Schritte.

### 4) Weitblick verstehen (gamma)
1. Setze gamma niedrig, zum Beispiel 0.20, trainiere, und beobachte das Verhalten.
2. Setze gamma hoch, zum Beispiel 0.95, trainiere, und vergleiche.
Merksatz: Hoher Weitblick bewertet zukünftige Rewards stärker. Niedriger Weitblick bevorzugt kurzfristige Effekte.

### 5) Lernrate verstehen (alpha)
1. Setze alpha hoch, zum Beispiel 0.90, und trainiere.
2. Setze alpha niedriger, zum Beispiel 0.20, und trainiere.
Merksatz: Hohe Lernrate passt schnell an, kann aber instabil werden. Niedrige Lernrate ist langsamer, dafür oft stabiler.

### 6) Schwierigkeit der Welt steuern
1. Erhöhe density schrittweise, zum Beispiel von 0.15 auf 0.30.
2. Beobachte, wann greedy scheitert und wann Q Learning deutlich besser wird.
Merksatz: Wenn die Welt schwerer wird, braucht Lernen mehr Episoden und Exploration wird wichtiger.

### 7) Return richtig deuten
Wenn es eine Schrittstrafe gibt, kann der Return trotz Erfolg klein oder negativ sein.
Weniger Schritte bedeutet meist besseren Return, weil weniger Strafen gesammelt werden.

Optional:
Nutze Quick Compare, um denselben Versuch mit mehreren eps start Werten direkt zu vergleichen.
