In [194]:
# !pip install gymnasium
# !pip install "gymnasium[atari, accept-rom-license]"
# !apt-get install -y swig
# !pip install gymnasium[box2d]

In [195]:
# ===========================
# IMPORTS & GLOBAL SETUP
# ===========================
import os
import json
import csv
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from collections import deque
from collections import defaultdict
from datetime import datetime

import torch.nn.functional as F

SEED = 42

In [196]:
import warnings
import logging

# -----------------------
# WARNINGS UNTERDR√úCKEN
# -----------------------
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

# Gym / MoviePy Logging leiser machen
logging.getLogger("gymnasium").setLevel(logging.ERROR)
logging.getLogger("moviepy").setLevel(logging.ERROR)

# SDL / Pygame Headless Fix (verhindert XDG_RUNTIME_DIR Meldung)
os.environ["SDL_AUDIODRIVER"] = "dummy"
os.environ["SDL_VIDEODRIVER"] = "dummy"

In [197]:
env_name = "CartPole-v1" # angepasst an Cart
env = gym.make(env_name)
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (4,)
State size:  4
Number of actions:  2


In [198]:
class QNetwork(nn.Module):
    """
    Dynamisches MLP f√ºr DQN.
    Linear + ReLU nur zwischen Hidden-Layern
    """
    def __init__(self, state_dim, action_dim, layer_sizes=[64, 8, 64, 64]):
        super().__init__()
        torch.manual_seed(42)

        layers = []
        input_size = state_dim
        for i, size in enumerate(layer_sizes):
            layers.append(nn.Linear(input_size, size))
            # ReLU nur nach Hidden-Layern, nicht nach dem letzten Layer
            if i != len(layer_sizes) - 1:
                layers.append(nn.ReLU())
            input_size = size

        # Output Layer
        layers.append(nn.Linear(input_size, action_dim))

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [199]:
class ReplayBuffer:
    """
    Speichert vergangene Erfahrungen:
    (state, action, reward, next_state, done)

    Warum?
    - bricht zeitliche Korrelationen
    - alte Erfahrungen k√∂nnen mehrfach gelernt werden
    """

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def add(self, experience):
        """
        F√ºgt eine Erfahrung hinzu.
        Wenn der Speicher voll ist ‚Üí die √§lteste fliegt raus.
        """
        self.memory.append(experience)
        if len(self.memory) > self.capacity:
            self.memory.pop(0)

    def sample(self, batch_size):
        """
        Zieht zuf√§llige Erfahrungen f√ºr ein Lern-Update
        """
        batch = random.sample(self.memory, batch_size)

        states      = torch.tensor(np.vstack([e[0] for e in batch]), dtype=torch.float32).to(self.device)
        actions     = torch.tensor(np.vstack([e[1] for e in batch]), dtype=torch.long).to(self.device)
        rewards     = torch.tensor(np.vstack([e[2] for e in batch]), dtype=torch.float32).to(self.device)
        next_states = torch.tensor(np.vstack([e[3] for e in batch]), dtype=torch.float32).to(self.device)
        dones       = torch.tensor(np.vstack([e[4] for e in batch]), dtype=torch.float32).to(self.device)

        return states, actions, rewards, next_states, dones


In [200]:
class DQNAgent:
    """
    DQN-Agent mit dynamischem QNetwork (beliebige Layer-Architektur)
    """

    def __init__(
        self,
        state_dim,
        action_dim,
        learning_rate,
        buffer_size,
        batch_size,
        gamma,
        tau,
        layer_info=None,  # <-- Neue Option f√ºr dynamische Architekturen
    ):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.batch_size = batch_size
        self.gamma = gamma          # Discount-Faktor
        self.tau = tau              # Soft-Update-Faktor bzw interpolation faktor
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.seed = SEED

        # Standard-Layer falls nichts angegeben
        if layer_info is None:
            layer_info = [64, 64, 64]

        # Online-Netzwerk
        self.q_network = QNetwork(state_dim, action_dim, layer_info).to(self.device)

        # Target-Netzwerk
        self.target_q_network = QNetwork(state_dim, action_dim, layer_info).to(self.device)
        self.target_q_network.load_state_dict(self.q_network.state_dict())  # initial synchronisieren

        # Optimizer
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)

        # Replay Buffer
        self.replay_buffer = ReplayBuffer(buffer_size)

        self.step_counter = 0

    def select_action(self, state, epsilon = 0.):
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)

        self.q_network.eval()
        with torch.no_grad():
            q_values = self.q_network(state)
        self.q_network.train()

        if random.random() > epsilon:
            return q_values.argmax(dim=1).item()
        else:
            return random.randrange(self.action_dim)

    def step(self, state, action, reward, next_state, done):
        self.replay_buffer.add((state, action, reward, next_state, done))
        self.step_counter += 1

        # Nicht bei jedem Schritt lernen (stabiler)
        if self.step_counter % 4 == 0:
            if len(self.replay_buffer.memory) >= self.batch_size:
                batch = self.replay_buffer.sample(self.batch_size)
                self.learn(batch)

    def learn(self, experiences):
        states, actions, rewards, next_states, dones = experiences

        with torch.no_grad():
            next_q_values = self.target_q_network(next_states).max(dim=1, keepdim=True)[0]
            q_targets = rewards + self.gamma * next_q_values * (1 - dones)

        q_expected = self.q_network(states).gather(1, actions)

        loss = F.mse_loss(q_expected, q_targets)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # self.soft_update()   # deaktiviert            angepasst an Cart

        if self.step_counter % 100 == 0:             # angepasst an Cart
            self.target_q_network.load_state_dict(   # angepasst an Cart
                self.q_network.state_dict()          # angepasst an Cart
            )

    def soft_update(self):
        for target_param, local_param in zip(
            self.target_q_network.parameters(),
            self.q_network.parameters()
        ):
            target_param.data.copy_(
                self.tau * local_param.data +
                (1.0 - self.tau) * target_param.data
            )


In [201]:
def get_layer_info(agent_or_model):
    """
    Gibt nur die Gr√∂√üe der Linear-Layer zur√ºck, z.B. [64, 64]
    """
    if hasattr(agent_or_model, "q_network"):
        model = agent_or_model.q_network
    else:
        model = agent_or_model

    layer_sizes = []
    for layer in model.modules():
        if isinstance(layer, nn.Linear) and layer.out_features != model.model[-1].out_features:
            # Letzter Linear-Layer (Output) ignorieren
            layer_sizes.append(layer.out_features)
    return layer_sizes

# -------------------------------
# load_model_bundle: rekonstruiert Linear+ReLU Modell
# -------------------------------
def save_model_bundle(model, save_dir, state_dim, action_dim, fitness=None, generation=None):
    os.makedirs(save_dir, exist_ok=True)
    # 1Ô∏è‚É£ Weights speichern
    torch.save(model.state_dict(), os.path.join(save_dir, "model.pth"))

    # 2Ô∏è‚É£ Meta speichern
    meta = {
        "state_dim": int(state_dim),
        "action_dim": int(action_dim),
        "layer_info": get_layer_info(model),
        "fitness": float(fitness) if fitness is not None else None,
        "generation": int(generation) if generation is not None else None,
        "seed": int(SEED) if SEED is not None else None,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }

    with open(os.path.join(save_dir, "meta.json"), "w") as f:
        json.dump(meta, f, indent=4)
    print(f"‚úì Model saved to {save_dir}")

def load_model_bundle(model_dir, device="cpu"):
    # Meta laden
    with open(os.path.join(model_dir, "meta.json"), "r") as f:
        meta = json.load(f)
    state_dim = meta["state_dim"]
    action_dim = meta["action_dim"]
    layer_info = meta["layer_info"]

    # Modell rekonstruieren
    model = QNetwork(state_dim, action_dim, layer_info).to(device)

    # Gewichte laden (reihenfolgebasiert)
    state_dict_saved = torch.load(os.path.join(model_dir, "model.pth"), map_location=device)
    state_dict_model = model.state_dict()
    for k_model, k_saved in zip(state_dict_model.keys(), state_dict_saved.keys()):
        state_dict_model[k_model] = state_dict_saved[k_saved]
    model.load_state_dict(state_dict_model)

    return model, meta


def init_training_run(env_name, base_dir="training_history"):
    """
    Erstellt einen neuen Run-Ordner mit Timestamp
    """
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    run_dir = os.path.join(base_dir, f"{env_name}_{timestamp}")
    os.makedirs(run_dir, exist_ok=True)

    print(f"üìÅ Neuer Trainingslauf: {run_dir}")
    return run_dir

def init_training_log(run_dir):
    """
    Erstellt CSV-Logdatei f√ºr einen Trainingslauf
    """
    csv_path = os.path.join(run_dir, "training_log.csv")

    with open(csv_path, mode="w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow([
            "Environment",
            "Generation",
            "Model_Architecture",
            "Fitness",
            "Trained_Episodes",
        ])

    print(f"üìä Training-Log erstellt: {csv_path}")
    return csv_path
    
def append_generation_log(
    csv_path,
    env_name,
    generation,
    hidden_sizes,
    fitness,
    episodes_per_individual,
):
    with open(csv_path, mode="a", newline="", encoding="utf-8") as f:
        writer = csv.writer(f, delimiter=";")

        writer.writerow([
            env_name,
            generation,
            str(hidden_sizes),
            f"{fitness:.2f}",
            generation * episodes_per_individual,
        ])


In [202]:
class PopulationMember:
    def __init__(self, agent):
        self.agent = agent
        self.total_reward = 0.0
        self.episode_rewards = []

def select_parents(sorted_population):
    return sorted_population[0], sorted_population[1]

def check_architecture_dominance(
    architecture_win_counter,
    threshold=6,
):
    for arch, wins in architecture_win_counter.items():
        if wins >= threshold:
            return list(arch)
    return None


In [203]:
# =========================
# üöÄ MUTATION
# =========================
def mutate_architecture(
    layer_info, 
    n_mutations=1, 
    min_neurons=8, 
    max_neurons=128
):
    """
    Mutiert ein Layer-Array n_mutations Mal.
    Optionen pro Mutation:
    1Ô∏è‚É£ Dupliziere oder l√∂sche eine zuf√§llige Schicht (Layer-Anzahl begrenzt auf min_layers/max_layers)
    2Ô∏è‚É£ Neuronen *2 oder /2 (begrenzt auf min_neurons/max_neurons)
    """
    new_layers = layer_info.copy()
    for _ in range(n_mutations):
        if len(new_layers) == 0:
            # Falls alle Layer gel√∂scht wurden, mindestens 1 Layer wieder hinzuf√ºgen
            new_layers.append(min_neurons)
            continue

        idx = random.randint(0, len(new_layers)-1)
        op = random.choice(["layer_op", "scale"])

        if op == "layer_op":
            # Duplizieren oder l√∂schen, Layer-Anzahl beachten
            if len(new_layers) == 1:
                # Nur duplizieren m√∂glich
                if len(new_layers) < max_layers:
                    new_layers.insert(idx, new_layers[idx])
            else:
                if random.random() < 0.5:
                    # duplizieren (nur wenn max_layers nicht √ºberschritten)
                    if len(new_layers) < max_layers:
                        new_layers.insert(idx, new_layers[idx])
                else:
                    # l√∂schen (nur wenn min_layers nicht unterschritten)
                    if len(new_layers) > min_layers:
                        new_layers.pop(idx)
        else:
            # Neuronen *2 oder /2
            factor = random.choice([0.5, 2])
            new_layers[idx] = max(min_neurons, min(max_neurons, int(new_layers[idx] * factor)))

    return new_layers


def create_initial_population(population_size, state_dim, action_dim, min_layers=1, max_layers=10,
                              min_neurons=8, max_neurons=128):
    """
    Initialisiert Population. Layer-Gr√∂√üe als Faktor von min_neurons.
    """
    population = []
    for _ in range(population_size):
        num_layers = random.randint(min_layers, max_layers)
        layers = [random.randint(1, max_neurons // min_neurons) * min_neurons for _ in range(num_layers)]
        agent = DQNAgent(
            state_dim=state_dim,
            action_dim=action_dim,
            learning_rate=learning_rate,
            buffer_size=buffer_size,
            batch_size=batch_size,
            gamma=gamma,
            tau=tau,
            layer_info=layers
        )
        population.append(PopulationMember(agent))
    return population

def partial_load_state_dict_flexible(target, src_sd):
    tgt_sd = target.state_dict()
    for k, v in src_sd.items():
        if k in tgt_sd:
            if tgt_sd[k].shape == v.shape:
                # exakt gleiche Shape ‚Üí kopieren
                tgt_sd[k] = v
            else:
                # teilweises Kopieren f√ºr Linear Layer
                if len(v.shape) == 2 and len(tgt_sd[k].shape) == 2:
                    # M√∂gliche minimale Dimension ermitteln
                    min_rows = min(v.shape[0], tgt_sd[k].shape[0])
                    min_cols = min(v.shape[1], tgt_sd[k].shape[1])
                    tgt_sd[k][:min_rows, :min_cols] = v[:min_rows, :min_cols]
                # Bias teilweise √ºbernehmen
                elif len(v.shape) == 1 and len(tgt_sd[k].shape) == 1:
                    min_len = min(len(v), len(tgt_sd[k]))
                    tgt_sd[k][:min_len] = v[:min_len]
    target.load_state_dict(tgt_sd, strict=False)

def create_child_from_parents(parent_agent, mutation_factor=5, epsilon=1, min_mutations=1):
    n_mutations = max(min_mutations, int(mutation_factor * epsilon))
    new_layers = mutate_architecture(get_layer_info(parent_agent), n_mutations=n_mutations)
    child_agent = DQNAgent(
        state_dim=state_size,
        action_dim=number_actions,
        learning_rate=learning_rate,
        buffer_size=buffer_size,
        batch_size=batch_size,
        gamma=gamma,
        tau=tau,
        layer_info=new_layers
    )

    # Gewichte vom Elternteil √ºbernehmen, soweit m√∂glich
    partial_load_state_dict_flexible(child_agent.q_network, parent_agent.q_network.state_dict())
    
    return PopulationMember(child_agent)

def check_architecture_dominance(win_counter, threshold=6):
    for arch, count in win_counter.items():
        if count >= threshold:
            return list(arch)
    return None

def remove_duplicate_models(population):
    """
    Entfernt Population-Mitglieder mit identischer Layer-Architektur.
    Behalte nur das erste Vorkommen jeder Architektur.
    """
    seen_architectures = set()
    unique_population = []

    for member in population:
        arch_tuple = tuple(get_layer_info(member.agent))
        if arch_tuple not in seen_architectures:
            seen_architectures.add(arch_tuple)
            unique_population.append(member)

    return unique_population

In [205]:
def train_agent_for_episodes(
    env,
    agent,
    num_episodes,
    generation,
    max_steps_per_episode,
    epsilon_start,
    epsilon_end,
    epsilon_decay,
):
    """
    Trainiert einen Agenten f√ºr num_episodes Episoden.
    Œµ ist strikt lokal.
    Gibt Mean-Reward zur√ºck.
    """
    epsilon = epsilon_start
    rewards = []

    for ep in range(num_episodes):
        reset_out = env.reset(seed=generation * 10_000 + ep)
        state = reset_out[0] if isinstance(reset_out, tuple) else reset_out
        episode_reward = 0.0

        for _ in range(max_steps_per_episode):
            action = agent.select_action(state, epsilon)

            step_out = env.step(action)
            if len(step_out) == 5:
                next_state, reward, terminated, truncated, _ = step_out
                done = terminated or truncated
            else:
                next_state, reward, done, _ = step_out

            agent.step(state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward

            if done:
                break

        rewards.append(episode_reward)

        # Œµ nur lokal decayn
        epsilon = max(epsilon_end, epsilon * epsilon_decay)

    return float(np.mean(rewards))

In [206]:
def run_trained_model_and_record(
    model_dir,
    env_name="CartPole-v1",
    video_dir="videos",
    max_steps=500,
    device="cpu"
):

    os.makedirs(video_dir, exist_ok=True)

    # --------
    # Modell + Metadaten laden
    # --------
    model, meta = load_model_bundle(model_dir, device=device)
    model.eval()
    print(f"Lade Modell aus '{model_dir}' mit Layer-Info: {meta['layer_info']}")

    # --------
    # ENV vorbereiten + Video-Wrapper
    # --------
    env = gym.make(env_name, render_mode="rgb_array")
    env = RecordVideo(
        env,
        video_folder=video_dir,
        episode_trigger=lambda ep: True,  # jedes Episode aufnehmen
        name_prefix="episode"
    )

    reset_out = env.reset()
    state = reset_out[0] if isinstance(reset_out, tuple) else reset_out
    total_reward = 0

    # --------
    # Episode ausf√ºhren
    # --------
    for step in range(max_steps):
        state_t = torch.FloatTensor(state).unsqueeze(0).to(device)

        with torch.no_grad():
            q_values = model(state_t)
            action = torch.argmax(q_values, dim=-1).item()

        step_out = env.step(action)
        if len(step_out) == 5:
            state, reward, terminated, truncated, _ = step_out
            done = terminated or truncated
        else:
            state, reward, done, _ = step_out

        total_reward += reward
        if done:
            break

    env.close()

    print(f"Episode L√§nge: {step+1}")
    print(f"Return: {total_reward}")
    print(f"Video gespeichert in: {video_dir}")


In [207]:
learning_rate = 1e-4  # angepasst an Cart
buffer_size = 10000   # angepasst an Cart
batch_size = 100      
gamma = 0.99
tau = 1e-3

# -----------------------------
# Evolution√§re Trainingsparameter
# -----------------------------

population_size = 4
mutation_factor = 5
thresholddom = 5
min_layers=1 
max_layers=5

max_steps_per_episode = 500  # angepasst an Cart
goal = 500                   # angepasst an Cart

epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 0.995
scores_window = deque(maxlen=100)

In [208]:
population = create_initial_population(population_size, state_dim=state_size, action_dim=number_actions)
architecture_win_counter = defaultdict(int)
dominant_architecture = None

# -----------------------------
# Ordner + Log initialisieren
# -----------------------------
run_dir = init_training_run(env_name)
log_csv = init_training_log(run_dir)

# -----------------------------
# Trainingsloop
# -----------------------------
generation = 0
epsilon = epsilon_start

# =========================
# üîπ Population trainieren
# =========================
while True:
    for member in population:
        mean_score = train_agent_for_episodes(
            env=env,
            agent=member.agent,
            num_episodes=batch_size,
            generation=generation,
            max_steps_per_episode=max_steps_per_episode,
            epsilon_start=epsilon,
            epsilon_end=epsilon_end,
            epsilon_decay=epsilon_decay,
        )
        member.total_reward = mean_score
        print(f"training von {get_layer_info(member.agent)} fertig")
        
    population.sort(key=lambda m: m.total_reward, reverse=True)
    print(f"\nüèÖ Population Ranking ‚Äì Generation {generation}")
    for rank, member in enumerate(population, start=1):
        print(
            f"{rank:2d}. Reward: {member.total_reward:8.2f} | "
            f"Layers: {get_layer_info(member.agent)}"
        )
        append_generation_log(
            csv_path=log_csv,
            env_name=env_name,
            generation=generation,
            hidden_sizes=get_layer_info(member.agent),
            fitness=member.total_reward,
            episodes_per_individual=batch_size
        )
    
    if population[0].total_reward >= goal:
        # üîπ Nur das beste Modell speichern
        save_dir = os.path.join(run_dir, f"checkpoint_generation_{generation}")
        save_model_bundle(
            model=population[0].agent.q_network,
            save_dir=save_dir,
            state_dim=state_size,
            action_dim=number_actions,
            fitness=population[0].total_reward,
            generation=generation
        )
        print(f"\nüéØ Goal erreicht mit {get_layer_info(population[0].agent)} in Generation {generation}")
        break
    
    # Dominanz pr√ºfen und +
    architecture_win_counter[tuple(get_layer_info(population[0].agent))] += 1
    dominant_architecture = check_architecture_dominance(architecture_win_counter, thresholddom)
    
    # üîπ Neue Population erzeugen
    new_population = []
    if dominant_architecture:
        new_population = [population[0]]  # nur dominant weitertrainieren
    else:
        new_population = [population[0], population[1]]
        remaining_slots = population_size - 2
        n_children_p1 = int(remaining_slots * 0.5)
        n_children_p2 = remaining_slots - n_children_p1

        print(f"Kinder haben {max(1, int(mutation_factor * epsilon))} mutationen")
    
        for _ in range(n_children_p1):
            child = create_child_from_parents(population[0].agent, mutation_factor, epsilon)
            new_population.append(child)
        for _ in range(n_children_p2):
            child = create_child_from_parents(population[1].agent, mutation_factor, epsilon)
            new_population.append(child)

    # üîπ Doppelte Modelle entfernen
    new_population = remove_duplicate_models(new_population)
    population = new_population
    
    # üîπ Epsilon decay
    epsilon = max(epsilon_end, epsilon * (epsilon_decay ** batch_size))
    generation += 1

üìÅ Neuer Trainingslauf: training_history/CartPole-v1_2026-01-10_15-26-15
üìä Training-Log erstellt: training_history/CartPole-v1_2026-01-10_15-26-15/training_log.csv
training von [72, 104, 8] fertig
training von [120, 40, 64, 56, 16, 96, 8, 120, 40, 56] fertig
training von [48, 104, 128, 8, 80, 80, 128, 24, 88, 16] fertig
training von [16, 32, 48, 88, 120, 120, 120] fertig

üèÖ Population Ranking ‚Äì Generation 0
 1. Reward:    21.78 | Layers: [72, 104, 8]
 2. Reward:    20.74 | Layers: [16, 32, 48, 88, 120, 120, 120]
 3. Reward:    18.43 | Layers: [120, 40, 64, 56, 16, 96, 8, 120, 40, 56]
 4. Reward:    17.74 | Layers: [48, 104, 128, 8, 80, 80, 128, 24, 88, 16]
Kinder haben 5 mutationen
‚úì Transfer: 4 Schichten interpoliert
‚úì Transfer: 8 Schichten interpoliert
training von [72, 104, 8] fertig
training von [16, 32, 48, 88, 120, 120, 120] fertig
training von [104, 104, 8] fertig
training von [16, 32, 24, 128, 120, 120, 120] fertig

üèÖ Population Ranking ‚Äì Generation 1
 1. Rew

KeyboardInterrupt: 

In [None]:
# Angenommen, das Modell wurde gespeichert in:
# model_dir = "training_history/CartPole-v1_2026-01-10_13-21-48/checkpoint_generation_30"

# Video ausf√ºhren und speichern
run_trained_model_and_record(
    model_dir=model_dir,
    env_name=env_name,                 # aus deinem Setup
    video_dir="videos",                # Ordner f√ºr Video
    max_steps=max_steps_per_episode,   # aus deinem Setup
    device="cpu"                       # falls GPU nicht verf√ºgbar
)

In [209]:
# !rm -rf checkpoints/
# !rm -rf training_history/
# !rm -rf videos/