<img src="Bilder/ost_logo.png" width="240"  align="right"/>
<div style="text-align: left"> <b> Applied Neural Networks | FS 2025 </b><br>
<a href="mailto:christoph.wuersch@ost.ch"> © Christoph Würsch, François Chollet </a> </div>
<a href="https://www.ost.ch/de/forschung-und-dienstleistungen/technik-neu/systemtechnik/ice-institut-fuer-computational-engineering"> Eastern Switzerland University of Applied Sciences OST | ICE </a>

[![Run in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ChristophWuersch/AppliedNeuralNetworks/blob/main/U07/ANN07_VGG16_SOLUTION_pl.ipynb)

In [None]:
# für Ausführung auf Google Colab auskommentieren und installieren
!pip install -q -r https://raw.githubusercontent.com/ChristophWuersch/AppliedNeuralNetworks/main/requirements.txt

In [None]:
import os
import shutil
import urllib.request
import zipfile
import numpy as np
import matplotlib.pyplot as plt
import itertools
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix
from PIL import Image
import tarfile

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms, datasets, models


import lightning as L
from lightning.pytorch.loggers import TensorBoardLogger


## (a) Knifey-Spoony-Datensatz herunterladen und vorbereiten

In diesem Abschnitt laden wir den Knifey-Spoony-Datensatz (22 MB) herunter, falls dieser noch nicht lokal vorhanden ist.
Anschliessend extrahieren wir das Archiv und kopieren die Bilder in eine Ordnerstruktur, die von PyTorchs `ImageFolder`-Klasse erwartet wird.

Dabei wird pro Klasse (z. B. fork, knife, spoon) ein eigener Unterordner angelegt und ca. 70 % der Bilder werden
für das Training und ca. 30 % für den Test genutzt. So erhält man eine saubere Trennung zwischen Trainings- und Testdaten.

In [None]:
def maybe_download_and_extract(
    dataset_url="https://github.com/Hvass-Labs/knifey-spoony/raw/master/knifey-spoony.tar.gz",
    dest_dir="knifey_spoony",
):
    """
    Lädt den Knifey-Spoony-Datensatz herunter und extrahiert ihn,
    falls das Zielverzeichnis noch nicht existiert.

    Parameter:
      - dataset_url: URL zum Herunterladen des Tar-Archivs.
      - dest_dir: Zielordner, in dem der Datensatz gespeichert und extrahiert wird.
    """
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir, exist_ok=True)
        tar_path = os.path.join(dest_dir, "knifey-spoony.tar.gz")
        print("Lade Knifey-Spoony-Datensatz herunter ...")
        urllib.request.urlretrieve(dataset_url, tar_path)
        print("Extrahiere den Datensatz ...")
        with tarfile.open(tar_path, "r:gz") as tar_ref:
            tar_ref.extractall(dest_dir)
        # Lösche das heruntergeladene Archiv, um Speicherplatz zu sparen
        os.remove(tar_path)
        print("Datensatz heruntergeladen und extrahiert.")
    else:
        print("Knifey-Spoony-Datensatz ist bereits vorhanden.")


def copy_files(
    src_dir="knifey_spoony",
    train_dest="knifey/train",
    test_dest="knifey/test",
    train_ratio=0.7,
):
    """
    Kopiert die Bilder aus dem extrahierten Knifey-Spoony-Datensatz in eine Ordnerstruktur,
    die von der PyTorch-Klasse ImageFolder erwartet wird.

    Für jede Klasse wird ein Unterordner in den Zielverzeichnissen (train und test) erstellt.
    Der Parameter train_ratio gibt an, wieviel Anteil der Bilder ins Training übernommen wird.

    Parameter:
      - src_dir: Quellordner mit den extrahierten Bilddateien, sortiert nach Klassen.
      - train_dest: Zielordner für Trainingsbilder.
      - test_dest: Zielordner für Testbilder.
      - train_ratio: Anteil der Bilder, der fürs Training verwendet wird (z. B. 0.7 = 70%).
    """
    # Erstelle die Zielverzeichnisse, falls sie noch nicht existieren
    os.makedirs(train_dest, exist_ok=True)
    os.makedirs(test_dest, exist_ok=True)

    # Durchlaufe alle Unterordner (Klassen) im Quellordner
    for cls in os.listdir(src_dir):
        cls_path = os.path.join(src_dir, cls)
        if os.path.isdir(cls_path):
            # Erstelle für jede Klasse entsprechende Unterordner in den Trainings- und Testverzeichnissen
            train_cls_dir = os.path.join(train_dest, cls)
            test_cls_dir = os.path.join(test_dest, cls)
            os.makedirs(train_cls_dir, exist_ok=True)
            os.makedirs(test_cls_dir, exist_ok=True)

            # Erstelle eine Liste aller Bilddateien (nur jpg, jpeg, png) in diesem Klassenordner
            images = [
                f
                for f in os.listdir(cls_path)
                if f.lower().endswith((".jpg", ".jpeg", ".png"))
            ]
            images = sorted(images)  # Sortieren für Reproduzierbarkeit
            np.random.shuffle(
                images
            )  # Zufällige Reihenfolge zur Aufteilung in Training und Test
            split_idx = int(len(images) * train_ratio)
            train_images = images[:split_idx]
            test_images = images[split_idx:]

            # Kopiere die Trainingsbilder in den entsprechenden Ordner
            for img in train_images:
                src_file = os.path.join(cls_path, img)
                dest_file = os.path.join(train_cls_dir, img)
                if not os.path.exists(dest_file):
                    shutil.copy(src_file, dest_file)
            # Kopiere die Testbilder in den entsprechenden Ordner
            for img in test_images:
                src_file = os.path.join(cls_path, img)
                dest_file = os.path.join(test_cls_dir, img)
                if not os.path.exists(dest_file):
                    shutil.copy(src_file, dest_file)
            print(
                f"Kopierte {len(train_images)} Trainingsbilder und {len(test_images)} Testbilder für Klasse '{cls}'."
            )
        else:
            print(f"'{cls_path}' ist kein Verzeichnis und wird übersprungen.")

    print("Bilder in Trainings- und Testordner kopiert.")


# Führe den Download und die Extraktion aus (sofern noch nicht geschehen)
maybe_download_and_extract()

# Kopiere die Bilder in eine klare Train/Test-Ordnerstruktur
copy_files()

# Setze die Pfade für die DataLoader
train_dir = "knifey/train"
test_dir = "knifey/test"

print("Trainingsverzeichnis:", train_dir)
print("Testverzeichnis:", test_dir)


## (b-c) Datenvorverarbeitung und DataLoader

In diesem Schritt definieren wir zwei Transformations-Pipelines:

1. **Training-Transformationen:**
   - Grössenanpassung (Resize) auf 224x224 Pixel.
   - RandomAffine: Wendet zufällige Rotationen, Translationen, Scherungen und Skalierungen an.
   - Zufällige horizontale und vertikale Spiegelung (Flip) zur Erhöhung der Variabilität.
   - Umwandlung in Tensoren und Normalisierung (gemäß ImageNet-Standards).

2. **Test-Transformationen:**
   - Nur Grössenanpassung, Umwandlung in Tensoren und Normalisierung – keine Augmentierung.

Anschließend erstellen wir über ImageFolder die Trainings- und Test-Datasets
und verpacken sie in DataLoader für die spätere Modellschulung.

In [None]:
# ImageNet-Standardnormalisierungswerte
imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std = [0.229, 0.224, 0.225]

# Transformations-Pipeline für das Training (inkl. Datenaugmentierung)
train_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),  # Passe die Bildgröße an
        transforms.RandomAffine(
            degrees=180,  # Zufällige Rotation bis zu 180°
            translate=(0.1, 0.1),  # Zufällige Verschiebung (bis zu 10% der Bildgröße)
            shear=10,  # Zufällige Scherung um 10 Grad
            scale=(0.9, 1.5),  # Zufälliger Zoom zwischen 0.9 und 1.5
        ),
        transforms.RandomHorizontalFlip(),  # Zufälliges Spiegeln horizontal
        transforms.RandomVerticalFlip(),  # Zufälliges Spiegeln vertikal
        transforms.ToTensor(),  # Konvertiere das Bild zu einem Tensor
        transforms.Normalize(
            mean=imagenet_mean, std=imagenet_std
        ),  # Normalisiere die Pixelwerte
    ]
)

# Transformations-Pipeline für den Test (ohne Augmentierung)
test_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),  # Passe die Bildgröße an
        transforms.ToTensor(),  # Konvertiere das Bild zu einem Tensor
        transforms.Normalize(
            mean=imagenet_mean, std=imagenet_std
        ),  # Normalisiere die Pixelwerte
    ]
)

# Erstelle die Datasets mit ImageFolder. Die Ordnerstruktur (train_dir und test_dir)
# legt automatisch die Klassen anhand der Unterordner fest.
train_dataset = datasets.ImageFolder(root=train_dir, transform=train_transforms)
test_dataset = datasets.ImageFolder(root=test_dir, transform=test_transforms)

# Erstelle DataLoader, um den Datensatz in Batches zu verarbeiten
BATCH_SIZE = 20
NUM_WORKERS = 4  # Passe die Anzahl der parallelen Prozesse ggf. an

train_loader = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS
)
test_loader = DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS
)

# Ausgabe der Klassenbezeichnungen und der Anzahl der Klassen
class_names = train_dataset.classes
num_classes = len(class_names)
print("Klassen:", class_names)
print("Anzahl der Klassen:", num_classes)


## (d) Berechnung der Klassengewichte

Bei unausgeglichenen Datensätzen (unbalanced datasets) kann es hilfreich sein,
der Verlustfunktion Klassengewichte mitzugeben, damit seltener vertretene Klassen
stärker gewichtet werden.

Hier berechnen wir die Klassengewichte mit `compute_class_weight` aus scikit‑learn,
basierend auf der Verteilung der Labels im Trainingsdatensatz.
Die Gewichte werden dann in einen Torch-Tensor umgewandelt und später in den Loss integriert.

In [None]:
# Sammle alle Labels aus dem Trainingsdatensatz
train_labels = [s[1] for s in train_dataset.samples]
# Berechne die Klassengewichte (balanced mode passt die Gewichte automatisch an)
class_weights = compute_class_weight(
    class_weight="balanced", classes=np.unique(train_labels), y=train_labels
)
print("Klassengewichte:", class_weights)
# Konvertiere die Gewichte in einen Torch-Tensor
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float)


## (e) Definition des Modells mit lightning

Wir verwenden ein vortrainiertes VGG16-Modell als Feature-Extractor.
Dabei entfernen wir den originalen Klassifikator und fügen einen neuen, eigenen
Klassifikator hinzu, bestehend aus:

- Flatten: Um die 3D-Feature-Maps in einen 1D-Vektor umzuwandeln.
- Einem Linear-Layer mit ReLU-Aktivierung und Dropout (zur Regularisierung).
- Einem finalen Linear-Layer, der direkt die Klassenvorhersagen liefert.

Beim Transfer Learning bleiben zunächst alle Parameter des Feature-Extractors eingefroren.
Beim Fine-Tuning (wenn ``fine_tuning=True``) werden einige der tieferen Schichten wieder trainierbar gemacht.

Das LightningModule implementiert zudem die Trainings-, Validierungs- und Testschritte,
sowie die Konfiguration des Optimierers (hier Adam).

In [None]:
class VGG16TransferLearning(L.LightningModule):
    def __init__(self, num_classes, lr=1e-5, class_weights=None, fine_tuning=False):
        """
        Initialisiert das Transfer Learning-Modell basierend auf VGG16.

        Parameter:
          - num_classes (int): Anzahl der zu klassifizierenden Klassen.
          - lr (float): Lernrate.
          - class_weights (Tensor): Klassengewichte, um unbalancierte Klassen zu kompensieren.
          - fine_tuning (bool): Falls True, werden tiefer liegende Schichten für ein Fine-Tuning freigegeben.
        """
        super(VGG16TransferLearning, self).__init__()
        self.num_classes = num_classes
        self.lr = lr
        self.fine_tuning = fine_tuning

        # Lade das vortrainierte VGG16-Modell aus torchvision
        vgg16 = models.vgg16(pretrained=True)
        # Extrahiere nur den Feature-Extractor (ohne den originalen Klassifikator)
        self.features = vgg16.features
        # print(self.features)

        # Beim Transfer Learning frieren wir die Parameter des Feature-Extractors ein,
        # sodass nur der neue Klassifikator trainiert wird.
        if not self.fine_tuning:
            for param in self.features.parameters():
                param.requires_grad = False
        else:
            # Wenn Fine-Tuning gewünscht ist, frieren wir nur die ersten 24 Schichten ein.
            for i, layer in enumerate(self.features):
                if i < 28:
                    for param in layer.parameters():
                        param.requires_grad = False
                else:
                    for param in layer.parameters():
                        param.requires_grad = True

        # Nach der Extraktion liefern VGG16 bei 224x224 eine Feature-Map von Dimension 512 x 7 x 7.
        self.flatten = nn.Flatten()
        self.classifier = nn.Sequential(
            nn.Linear(
                512 * 7 * 7, 1024
            ),  # Reduziert die Dimension und leitet sie an den nächsten Layer weiter
            nn.ReLU(),  # Aktivierungsfunktion
            nn.Dropout(0.5),  # Dropout zur Vermeidung von Overfitting
            nn.Linear(
                1024, self.num_classes
            ),  # Finaler Linear-Layer, der die Klassenvorhersagen liefert
        )

        # Wähle die Loss-Funktion (CrossEntropyLoss) und integriere ggf. die Klassengewichte
        if class_weights is not None:
            self.criterion = nn.CrossEntropyLoss(weight=class_weights)
        else:
            self.criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        # Vorwärtsdurchlauf: Zuerst den Feature-Extractor anwenden, dann flatten und Klassifikator
        x = self.features(x)
        x = self.flatten(x)
        logits = self.classifier(x)
        return logits

    def training_step(self, batch, batch_idx):
        # Ein Trainingsschritt: Berechne den Loss und die Genauigkeit (Accuracy)
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        # Logge die Metriken
        self.log("train_loss", loss, on_step=False, on_epoch=True)
        self.log("train_acc", acc, on_step=False, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        # Ein Validierungsschritt: Berechne Loss und Accuracy und sammle Vorhersagen für die spätere Auswertung
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", acc, prog_bar=True)
        return {"val_loss": loss, "val_acc": acc, "preds": preds, "targets": y}

    def test_step(self, batch, batch_idx):
        # Ein Testschritt, ähnlich wie bei der Validierung, um die Leistung auf dem Testdatensatz zu evaluieren
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        self.log("test_loss", loss)
        self.log("test_acc", acc)
        return {"test_loss": loss, "test_acc": acc, "preds": preds, "targets": y}

    def configure_optimizers(self):
        # Verwende den Adam-Optimierer für das Training
        optimizer = optim.Adam(self.parameters(), lr=self.lr)
        return optimizer


![VGG16](https://storage.googleapis.com/lds-media/images/transfer-learning-fine-tuning-approach.width-1200.jpg)

## (f) Training – Phase 1 (Transfer Learning)

In dieser Trainingsphase bleibt der Feature-Extractor eingefroren, und es wird nur
der neue Klassifikator (die letzten Schichten) trainiert. Hier nutzen wir eine relativ
hohe Lernrate (1e-5), da nur wenige Parameter aktualisiert werden.

Ein TensorBoard-Logger wird eingerichtet, um während des Trainings Metriken zu überwachen.

In [None]:
# Richte den TensorBoard Logger ein, um das Training zu protokollieren
logger = TensorBoardLogger("tb_logs", name="vgg16_transfer_learning")

# Initialisiere das Modell für Transfer Learning (ohne Fine-Tuning)
model_transfer = VGG16TransferLearning(
    num_classes=num_classes,
    lr=1e-5,
    class_weights=class_weights_tensor,
    fine_tuning=False,
)

# Konfiguriere den Trainer von lightning
trainer = L.Trainer(
    max_epochs=5,
    accelerator="auto",
    devices="auto",
    log_every_n_steps=10,
    logger=logger,
)

# Starte das Training und validiere gleichzeitig mit dem Test-Dataloader
trainer.fit(model_transfer, train_loader, test_loader)
# Teste das Modell nach dem Training
trainer.test(model_transfer, test_loader)


## (g) Evaluation und Visualisierung

Nach dem Training wollen wir die Leistung unseres Modells besser verstehen.
Hierzu berechnen wir die Konfusionsmatrix, die zeigt, wie oft Bilder einer Klasse
fälschlicherweise einer anderen zugeordnet wurden.

Dazu definieren wir:
- Eine Funktion `plot_confusion_matrix`, die die Matrix visualisiert.
- Eine Funktion `get_predictions`, die alle Vorhersagen und tatsächlichen Labels sammelt.

Abschließend plotten wir die normalisierte Konfusionsmatrix.

In [None]:
def plot_confusion_matrix(
    cm, classes, normalize=False, title="Confusionsmatrix", cmap=plt.cm.Blues
):
    """
    Visualisiert die Konfusionsmatrix.

    Parameter:
      - cm: Die Konfusionsmatrix.
      - classes: Liste der Klassennamen.
      - normalize: Falls True, werden die Werte normiert (Prozentsatz pro Klasse).
      - title: Titel des Plots.
      - cmap: Farbschema für den Plot.
    """
    if normalize:
        cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]

    plt.figure(figsize=(6, 6))
    plt.imshow(cm, interpolation="nearest", cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = ".2f" if normalize else "d"
    thresh = cm.max() / 2.0
    # Beschrifte jede Zelle der Matrix
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(
            j,
            i,
            format(cm[i, j], fmt),
            horizontalalignment="center",
            color="white" if cm[i, j] > thresh else "black",
        )

    plt.ylabel("Tatsächliche Klasse")
    plt.xlabel("Vorhergesagte Klasse")
    plt.tight_layout()
    plt.show()


def get_predictions(model, dataloader):
    """
    Holt die Vorhersagen und die tatsächlichen Labels für alle Daten aus dem Dataloader.

    Parameter:
      - model: Das trainierte Modell.
      - dataloader: Der DataLoader, von dem die Daten geladen werden.

    Rückgabe:
      - all_preds: Array der Vorhersagen.
      - all_targets: Array der tatsächlichen Labels.
    """
    model.eval()
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for x, y in dataloader:
            # Stelle sicher, dass die Daten auf demselben Gerät wie das Modell liegen
            x = x.to(model.device) if hasattr(model, "device") else x
            logits = model(x)
            preds = torch.argmax(logits, dim=1)
            all_preds.append(preds.cpu().numpy())
            all_targets.append(y.cpu().numpy())
    return np.concatenate(all_preds), np.concatenate(all_targets)


# Hole alle Vorhersagen und tatsächlichen Labels für den Testdatensatz
preds, targets = get_predictions(model_transfer, test_loader)
# Berechne die Konfusionsmatrix
cm = confusion_matrix(targets, preds)
# Visualisiere die normalisierte Konfusionsmatrix
plot_confusion_matrix(cm, classes=class_names, normalize=True)


## (h) Training – Phase 2 (Fine-Tuning)

Nachdem der neue Klassifikator im Transfer-Learning-Ansatz trainiert wurde,
möchten wir nun auch Teile des Feature-Extractors weiter anpassen.

Beim Fine-Tuning werden ab Layer-Index 24 die Parameter freigegeben, sodass auch
der Feature-Extractor weiter optimiert wird. Da hier wesentlich mehr Parameter
trainiert werden, wird eine deutlich niedrigere Lernrate (hier 1e-6) verwendet, um
Überanpassung (Overfitting) zu vermeiden.

In [None]:
# Initialisiere das Modell für das Fine-Tuning (teilweise freigegebene Schichten)
model_finetune = VGG16TransferLearning(
    num_classes=num_classes,
    lr=1e-5,  # Geringere Lernrate für das Fine-Tuning
    class_weights=class_weights_tensor,
    fine_tuning=True,
)

# Konfiguriere einen weiteren Trainer für das Fine-Tuning
trainer_finetune = L.Trainer(
    max_epochs=5,
    accelerator="auto",
    devices="auto",
    log_every_n_steps=10,
    logger=logger,
)

# Starte das Fine-Tuning
trainer_finetune.fit(model_finetune, train_loader, test_loader)
# Teste das feinabgestimmte Modell
trainer_finetune.test(model_finetune, test_loader)


In [None]:
# Hole alle Vorhersagen und tatsächlichen Labels für den Testdatensatz
preds_finetune, targets_finetune = get_predictions(model_finetune, test_loader)
# Berechne die Konfusionsmatrix
cm_finetune = confusion_matrix(targets, preds)
# Visualisiere die normalisierte Konfusionsmatrix
plot_confusion_matrix(cm_finetune, classes=class_names, normalize=True)


## Zusammenfassung

In diesem Notebook haben wir:

- Den Knifey-Spoony-Datensatz heruntergeladen, extrahiert und in eine übersichtliche
  Train/Test-Ordnerstruktur kopiert.
- Datenvorverarbeitung und Augmentierung definiert und die Daten über ImageFolder in
  DataLoader verpackt.
- Ein vortrainiertes VGG16-Modell als Feature-Extractor genutzt und einen neuen Klassifikator
  aufgebaut.
- Zuerst Transfer Learning (eingefrorener Feature-Extractor) und anschließend Fine-Tuning
  (teilweise freigegebene Schichten) durchgeführt.
- Trainings-, Validierungs- und Testschritte in einem LightningModule implementiert.
- Eine Evaluation mittels einer Konfusionsmatrix durchgeführt, um die Klassifizierungsleistung
  zu beurteilen.

Du kannst den Code weiter an Deine Anforderungen anpassen, z. B. indem Du andere Modelle oder
Hyperparameter ausprobierst.