# NAS für MedMNIST `CNN`
Dieses Notebook:
1. Erstellt Datenlader für MedMNIST (`pathmnist` oder `tissuemnist`)  
2. Definiert ein CNN-Modell  
3. Implementiert epochweise `train`- und `eval`-Schleifen  
4. Führt eine neuronale Architektursuche mit **Optuna** durch  
5. ~~Trainiert die beste Konfiguration neu und speichert Logs und das Modell~~
6. ~~Zeichnet die epochale **Verlust/Genauigkeit** und eine **Cofusion Matrix** auf~~

In [1]:
#!pip install optuna torchmetrics 
import os, json, time, random
import numpy as np
import torch
import tqdm as notebook_tqdm
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from torch.utils.data import DataLoader, random_split, Subset
import seaborn as sns
from torchmetrics.classification import ConfusionMatrix
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support,
    classification_report, confusion_matrix
)
from sklearn.model_selection import KFold
from medmnist import INFO, TissueMNIST, PathMNIST
import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import RandomSampler, TPESampler, CmaEsSampler 

import matplotlib.pyplot as plt

def set_seeds(seed: int = 42):
    random.seed(seed); np.random.seed(seed)
    torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    raise RuntimeError("no cuda gpu found")
print("Device:", device)

Device: cuda


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# CONFIG
CFG = {
    "dataset": "pathmnist",
    "img_size": 64,
    "batch": 64,
    "seed": 42,
    "device": device,

    # Optuna search
    "trials": 40,          
    "epochs_trial": 8,     
    "retrain_epochs": 20,  
}

os.makedirs("outputs", exist_ok=True)
set_seeds(CFG["seed"])

In [3]:
def build_loaders(dataset_name: str, img_size: int, batch: int, seed: int):
    info = INFO[dataset_name]
    n_classes = len(info["label"])
    class_names = [str(k) for k in range(n_classes)] 
    n_channels = info["n_channels"]
    as_rgb = (n_channels == 3)

    mean = [0.5] * n_channels
    std  = [0.5] * n_channels

    train_tf = T.Compose([
        T.Resize((img_size, img_size)),
        T.RandomHorizontalFlip(p=0.5),
        T.RandomRotation(10),
        T.ToTensor(),
        T.Normalize(mean, std),
    ])
    test_tf = T.Compose([
        T.Resize((img_size, img_size)),
        T.ToTensor(),
        T.Normalize(mean, std),
    ])

    DataSet = PathMNIST if dataset_name == "pathmnist" else TissueMNIST
    train_set = DataSet(split="train", download=True, transform=train_tf, as_rgb=as_rgb)
    test_set  = DataSet(split="test",  download=True, transform=test_tf, as_rgb=as_rgb)

    g = torch.Generator().manual_seed(seed)
    val_size = int(0.1 * len(train_set))
    train_set, val_set = random_split(train_set, [len(train_set)-val_size, val_size], generator=g)

    train_loader = DataLoader(train_set, batch_size=batch, shuffle=True,  num_workers=0, pin_memory=True)
    val_loader   = DataLoader(val_set,   batch_size=batch, shuffle=False, num_workers=0, pin_memory=True)
    test_loader  = DataLoader(test_set,  batch_size=batch, shuffle=False, num_workers=0, pin_memory=True)

    return train_loader, val_loader, test_loader, n_channels, n_classes, class_names, train_set

In [4]:
class DynamicCNN(nn.Module):
    def __init__(
        self,
        in_ch: int,
        conv_channels: list[int],     
        linear_units: list[int],       
        kernel_size: int,
        pool_type: str,
        pool_every: int,
        dropout: float,
        n_classes: int,
        img_size: int, 
    ):
        super().__init__()
        k = kernel_size
        Pool = nn.MaxPool2d if pool_type == "max" else nn.AvgPool2d

        # Convolutional-Teil (dynamisch) 
        conv_layers = []
        prev_ch = in_ch
        current_spatial = img_size 

        for i, ch in enumerate(conv_channels):
            conv_layers.append(
                nn.Conv2d(prev_ch, ch, k, padding=k // 2, bias=False)
            )
            conv_layers.append(nn.BatchNorm2d(ch))
            conv_layers.append(nn.ReLU(inplace=True))

            #nach jedem pool_every-Conv Max/Avg-Pooling
            if (i + 1) % pool_every == 0 and current_spatial > 2:
                conv_layers.append(Pool(2))
                current_spatial //= 2  

            prev_ch = ch

        self.conv = nn.Sequential(*conv_layers)

        # Fully-Connected-Teil (dynamisch) 
        head_layers = [
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(), 
        ]
        in_features = prev_ch

        for units in linear_units:
            head_layers.append(nn.Linear(in_features, units))
            head_layers.append(nn.ReLU(inplace=True))
            head_layers.append(nn.Dropout(dropout))
            in_features = units

        head_layers.append(nn.Linear(in_features, n_classes))
        self.head = nn.Sequential(*head_layers)

    def forward(self, x):
        x = self.conv(x)
        x = self.head(x)
        return x


In [5]:
# train/eval
def train_epoch(model, loader, loss_fn, opt, device):
    model.train()
    total_loss, total_correct, total = 0.0, 0, 0
    for X, y in loader:
        y = y.squeeze().long()
        X, y = X.to(device), y.to(device)
        opt.zero_grad(set_to_none=True)
        logits = model(X)
        loss = loss_fn(logits, y)
        loss.backward()
        opt.step()
        bs = X.size(0)
        total_loss += loss.item() * bs
        total_correct += (logits.argmax(1) == y).sum().item()
        total += bs
    return total_loss / total, 100.0 * total_correct / total

@torch.no_grad()
def eval_epoch(model, loader, loss_fn, device):
    model.eval()
    total_loss, total_correct, total = 0.0, 0, 0
    for X, y in loader:
        y = y.squeeze().long()
        X, y = X.to(device), y.to(device)
        logits = model(X)
        loss = loss_fn(logits, y)
        bs = X.size(0)
        total_loss += loss.item() * bs
        total_correct += (logits.argmax(1) == y).sum().item()
        total += bs
    return total_loss / total, 100.0 * total_correct / total

In [6]:
train_loader, val_loader, test_loader, IN_CH, N_CLASSES, CLASS_NAMES, train_set = build_loaders(
    CFG["dataset"], CFG["img_size"], CFG["batch"], CFG["seed"]
)

n_splits = 3
kf = KFold(n_splits=n_splits, shuffle=True, random_state=CFG["seed"])

indices = np.arange(len(train_set))
folds = list(kf.split(indices))   # Liste von (train_idx, val_idx)

print("IN_CH:", IN_CH)
print("N_CLASSES:", N_CLASSES)
print("CLASS_NAMES:", CLASS_NAMES)

IN_CH: 3
N_CLASSES: 9
CLASS_NAMES: ['0', '1', '2', '3', '4', '5', '6', '7', '8']


In [7]:
def make_objective(train_set, folds, cfg):

    def objective(trial):
        device = cfg["device"]

        # Architektur aus trial
        n_conv_layers  = trial.suggest_int("n_conv_layers", 8, 14)
        n_linear_layers = trial.suggest_int("n_linear_layers", 1, 2)

        conv_channels = []
        for i in range(n_conv_layers):
            ch = trial.suggest_categorical(f"conv_ch_{i}", [16, 32, 64, 128, 256])
            conv_channels.append(ch)

        linear_units = []
        for j in range(n_linear_layers):
            u = trial.suggest_categorical(f"linear_units_{j}", [32, 64, 128, 256])
            linear_units.append(u)

        kernel_size = trial.suggest_categorical("kernel_size", [3, 5])
        pool_every  = trial.suggest_categorical("pool_every", [2, 4])
        pool_type   = trial.suggest_categorical("pool_type", ["max", "avg"])
        dropout     = trial.suggest_float("dropout", 0.3, 0.5)

        lr          = trial.suggest_float("lr", 1e-4, 1e-2, log=True)

        conv_complexity   = sum(conv_channels)
        linear_complexity = sum(linear_units)
        complexity = conv_complexity + 0.5 * linear_complexity
        complexity_norm = complexity / 1000.0

        loss_fn = torch.nn.CrossEntropyLoss()
        cv_epochs = cfg.get("cv_epochs", 5)
        batch = cfg["batch"]

        cv_losses = []

        for fold_idx, (train_idx, val_idx) in enumerate(folds):
            # Subsets pro Fold
            fold_train = Subset(train_set, train_idx)
            fold_val   = Subset(train_set, val_idx)

            train_loader = DataLoader(fold_train, batch_size=batch, shuffle=True,
                                      num_workers=0, pin_memory=True)
            val_loader   = DataLoader(fold_val, batch_size=batch, shuffle=False,
                                      num_workers=0, pin_memory=True)

            # Modell für jeden Fold neu initialisieren
            model = DynamicCNN(
                in_ch=IN_CH,
                conv_channels=conv_channels,
                linear_units=linear_units,
                kernel_size=kernel_size,
                pool_type=pool_type,
                pool_every=pool_every,
                dropout=dropout,
                n_classes=N_CLASSES,
                img_size=cfg["img_size"]
            ).to(device)

            optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

            for epoch in range(cv_epochs):
                train_epoch(model, train_loader, loss_fn, optimizer, device)

            val_loss, val_acc = eval_epoch(model, val_loader, loss_fn, device)
            cv_losses.append(val_loss)

        mean_cv_loss = float(np.mean(cv_losses))

        # Objective: Loss + Komplexitätsstrafe
        alpha = 1e-3
        objective_value = mean_cv_loss + alpha * complexity_norm

        trial.set_user_attr("mean_cv_loss", mean_cv_loss)
        trial.set_user_attr("complexity", complexity)

        return objective_value

    return objective

In [8]:
base_folder = os.path.join("outputs", f"nas_{CFG['dataset']}")
version = 1
output_folder = f"{base_folder}_v{version}"
while os.path.exists(output_folder):
    version += 1
    output_folder = f"{base_folder}_v{version}"

os.makedirs(output_folder, exist_ok=True)
print("Created:", output_folder)

db_path = os.path.join(output_folder, "study.db")
STORAGE_URL = f"sqlite:///{db_path}"
print("Using storage:", STORAGE_URL)


def make_folder(name):
    folder = os.path.join(output_folder, name)
    os.makedirs(folder, exist_ok=True)
    return folder

objective = make_objective(train_set, folds, CFG)

random_folder = make_folder("Random")

study_random = optuna.create_study(
    study_name=f"Random_{CFG['dataset']}",
    storage=STORAGE_URL,
    load_if_exists=False,
    direction="minimize",
    sampler=RandomSampler(seed=CFG["seed"]),
    pruner=MedianPruner(n_warmup_steps=3)
)

print(f"Running Random_{CFG['dataset']} Optuna study...")
study_random.optimize(objective, n_trials=CFG["trials"], gc_after_trial=True)
print("Best value:", study_random.best_value)
print("Best params:", study_random.best_params)

df = study_random.trials_dataframe()
df.to_csv(os.path.join(random_folder, "trials.csv"), index=False)
with open(os.path.join(random_folder, "best_params.json"), "w") as f:
    json.dump(study_random.best_params, f, indent=2)


tpe_folder = make_folder("TPE")

study_tpe = optuna.create_study(
    study_name=f"TPE_{CFG['dataset']}",
    storage=STORAGE_URL,
    load_if_exists=False,
    direction="minimize",
    sampler=TPESampler(
        seed=CFG["seed"],
        multivariate=True,
        group=True
    ),
    pruner=MedianPruner(n_warmup_steps=3)
)

print(f"Running TPE_{CFG['dataset']} Optuna study...")
study_tpe.optimize(objective, n_trials=CFG["trials"], gc_after_trial=True)
print("Best value:", study_tpe.best_value)
print("Best params:", study_tpe.best_params)

df = study_tpe.trials_dataframe()
df.to_csv(os.path.join(tpe_folder, "trials.csv"), index=False)
with open(os.path.join(tpe_folder, "best_params.json"), "w") as f:
    json.dump(study_tpe.best_params, f, indent=2)


# cma_folder = make_folder("CMA")

# study_cma = optuna.create_study(
#     study_name=f"CMA_{CFG['dataset']}",
#     storage=STORAGE_URL,
#     load_if_exists=False,
#     direction="minimize",
#     sampler=CmaEsSampler(
#         seed=CFG["seed"],
#         sigma0=0.5
#     ),
#     pruner=MedianPruner(n_warmup_steps=3)
# )

# print(f"Running CMA_{CFG['dataset']} Optuna study...")
# study_cma.optimize(objective, n_trials=CFG["trials"], gc_after_trial=True)
# print("Best value:", study_cma.best_value)
# print("Best params:", study_cma.best_params)

# df = study_cma.trials_dataframe()
# df.to_csv(os.path.join(cma_folder, "trials.csv"), index=False)
# with open(os.path.join(cma_folder, "best_params.json"), "w") as f:
#     json.dump(study_cma.best_params, f, indent=2)

# # Trials als Tabelle speichern
# for name, study in [
#     ("Random", study_random),
#     ("TPE", study_tpe),
#     ("CMA", study_cma),
# ]:
#     df = study.trials_dataframe(attrs=("number", "value", "params", "state", "user_attrs"))
#     csv_path = f"outputs/{STUDY_PREFIX}_{name}_trials.csv"
#     json_path = f"outputs/{STUDY_PREFIX}_{name}_best_params.json"

#     df.to_csv(csv_path, index=False)
#     with open(json_path, "w") as f:
#         json.dump(study.best_params, f, indent=2)

#     print(f"Saved {name} trials to {csv_path}")
#     print(f"Saved {name} best params to {json_path}")


Created: outputs\nas_pathmnist_v2
Using storage: sqlite:///outputs\nas_pathmnist_v2\study.db


[I 2025-12-08 01:31:50,168] A new study created in RDB with name: Random_pathmnist


Running Random_pathmnist Optuna study...


[I 2025-12-08 01:37:59,632] Trial 0 finished with value: 0.3706224055968279 and parameters: {'n_conv_layers': 10, 'n_linear_layers': 2, 'conv_ch_0': 16, 'conv_ch_1': 256, 'conv_ch_2': 16, 'conv_ch_3': 128, 'conv_ch_4': 128, 'conv_ch_5': 128, 'conv_ch_6': 64, 'conv_ch_7': 32, 'conv_ch_8': 32, 'conv_ch_9': 128, 'linear_units_0': 32, 'linear_units_1': 256, 'kernel_size': 3, 'pool_every': 2, 'pool_type': 'avg', 'dropout': 0.32818484499495254, 'lr': 0.0040215545266902904}. Best is trial 0 with value: 0.3706224055968279.
[I 2025-12-08 01:46:08,860] Trial 1 finished with value: 0.3740296938997446 and parameters: {'n_conv_layers': 8, 'n_linear_layers': 2, 'conv_ch_0': 128, 'conv_ch_1': 32, 'conv_ch_2': 16, 'conv_ch_3': 128, 'conv_ch_4': 256, 'conv_ch_5': 32, 'conv_ch_6': 256, 'conv_ch_7': 64, 'linear_units_0': 128, 'linear_units_1': 64, 'kernel_size': 3, 'pool_every': 4, 'pool_type': 'max', 'dropout': 0.34558703250838835, 'lr': 0.0007148510793512986}. Best is trial 0 with value: 0.370622405596

Best value: 0.19244217392914287
Best params: {'n_conv_layers': 11, 'n_linear_layers': 1, 'conv_ch_0': 256, 'conv_ch_1': 32, 'conv_ch_2': 32, 'conv_ch_3': 64, 'conv_ch_4': 256, 'conv_ch_5': 128, 'conv_ch_6': 32, 'conv_ch_7': 64, 'conv_ch_8': 64, 'conv_ch_9': 32, 'conv_ch_10': 256, 'linear_units_0': 256, 'kernel_size': 3, 'pool_every': 4, 'pool_type': 'max', 'dropout': 0.3130409182520984, 'lr': 0.0008469597143996557}
Running TPE_pathmnist Optuna study...


[I 2025-12-08 07:06:16,164] Trial 0 finished with value: 0.3926846926991684 and parameters: {'n_conv_layers': 10, 'n_linear_layers': 2, 'conv_ch_0': 16, 'conv_ch_1': 256, 'conv_ch_2': 16, 'conv_ch_3': 128, 'conv_ch_4': 128, 'conv_ch_5': 128, 'conv_ch_6': 64, 'conv_ch_7': 32, 'conv_ch_8': 32, 'conv_ch_9': 128, 'linear_units_0': 32, 'linear_units_1': 256, 'kernel_size': 3, 'pool_every': 2, 'pool_type': 'avg', 'dropout': 0.32818484499495254, 'lr': 0.0040215545266902904}. Best is trial 0 with value: 0.3926846926991684.
[I 2025-12-08 07:14:15,110] Trial 1 finished with value: 0.3619459156654051 and parameters: {'n_conv_layers': 8, 'n_linear_layers': 2, 'conv_ch_0': 128, 'conv_ch_1': 32, 'conv_ch_2': 16, 'conv_ch_3': 128, 'conv_ch_4': 256, 'conv_ch_5': 32, 'conv_ch_6': 256, 'conv_ch_7': 64, 'linear_units_0': 128, 'linear_units_1': 64, 'kernel_size': 3, 'pool_every': 4, 'pool_type': 'max', 'dropout': 0.34558703250838835, 'lr': 0.0007148510793512986}. Best is trial 1 with value: 0.361945915665

Best value: 0.1724977705352536
Best params: {'n_conv_layers': 8, 'n_linear_layers': 1, 'conv_ch_0': 256, 'conv_ch_1': 16, 'conv_ch_2': 256, 'conv_ch_3': 128, 'conv_ch_4': 32, 'conv_ch_5': 128, 'conv_ch_6': 64, 'conv_ch_7': 64, 'linear_units_0': 128, 'kernel_size': 3, 'pool_every': 2, 'pool_type': 'max', 'dropout': 0.3986671360118502, 'lr': 0.0037019571035174586}
