# MuML Lab 4 - Deep Learning - CNN für die Solarzelleninspektion

## Ziel
In diesem Versuch soll ein pre-trained CNN mit nur wenig Trainingsdaten auf eine neue Aufgabe angepasst werden, um Solarzellen erkennen zu können. 
Dies entspricht dem typischen Anwendungsfall, dass ein Klassifier zu programmieren ist, für dessen grundlegendes Training die Datenmenge nicht ausreicht.

Die in den vorigen Laborversuchen selbst aufgenommenen Bilder sind von der Anzahl für diese Aufgabe nicht ausreichend. In Vorversuchen hat sich gezeigt, dass hiermit nur eine Genauigkeit von 40% erreicht werden konnte.

Daher wird auf einen im Internet verfügbaren und in zwei Fehlerklassen (crack, dark area) gelabelten Datensatz zurückgegriffen:
https://github.com/kirill-menke/resnet-defective-solar-cells?tab=readme-ov-file
Da dieser bereits augmentierte Daten enthält, wurden diese um Augmentierungen bereinigt und es sind für diesen Versuch die 2000 Bilder des in der Datei ```data.csv``` gelabelten Ordners ```images``` verwendet werden.

- Die Bilder 300 x 300 Pixel großen Bilder sollen auf eine Größe von 150 x 150 reduziert werden.
- Die Batch-Größe soll 16 betragen.
- Die Daten sollen im Verhältnis 60 - 20 - 20 auf Trainings-, Validierungs- und Testdaten aufgeteilt werden.
- Als Optimizer soll AdamW genutzt werden. Zunächst ist die Lernrate auf 10e-4 voreinzustellen.
- Es ist eine für das multi-label multi-class Problem geeignete Kostenfunktion zu wählen.

## Benötigte Bibliotheken

Aktiviere die Conda-Umgebung ```muml``` per ```conda activate muml``` nach Aufruf des Programms ```Anaconda Prompt``` und stelle durch Ausführen der folgenden Zelle sicher, sicher, dass alle benötigen Libraries installiert sind.

In [None]:
# Imports
import os
import numpy as np
from sklearn.metrics import confusion_matrix
from matplotlib import pyplot as plt
import torch
from torchvision import transforms
from torchvision.transforms import v2
from torchvision.datasets import ImageFolder
import torchvision
import torchvision.models as models
import torch.nn as nn
import cv2
from PIL import Image
import pandas as pd
import time

print("Erfolgsmeldung: Alle Bibliotheken wurden erfolgreich importiert.")

## Grundeinstellungen

Nachfolgend werden die in der Aufgabenstellung genannten Einstellungen als Konstanten definiert. Gegebenenfalls muss das Verzeichnis BASE_DIR für die Datei ```data.csv``` und den Ordner ```images``` angepasst werden. Auch wird geprüft, ob eine GPU angesprochen werden kann, die für diesen Laborversuch sehr empfohlen wird.

In [None]:
BASE_DIR = "."              # folder containing the data.csv file and the images folder
CSV_FILE = "data.csv"       # name of the csv file containing the image labels

TARGET_SIZE = (150, 150)    # target size for the images
BATCH_SIZE = 16             # batch size for the training to be reduced to 8 if memory is not sufficient
EPOCHS = 10                 # epochs for the training, should be reduced to 2 for test purposes to save time

TRAIN_SPLIT = 0.6           # share of the training data
VAL_SPLIT = 0.2             # share of validation data

# check for GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("GPU empfohlen. Die nachfolgende Ausgabe sollte daher statt cpu als aktives Gerät cuda ausgeben:")
print("Aktives Gerät:", device)

## Ausgangssituation

Analysiere die CSV-Datei data.csv und die zur Verfügung gestellten Bilder stichprobenartig.

**Aufbau von ```data.csv```:**
```
filename;crack;inactive
images/cell2044.png;1;0
images/cell0123.png;0;0
```

Der Zugriff auf die CSV-Datei und die Bilder ist nachfolgend beispielhaft demonstriert.

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt

# Load the CSV file with the shape file;label1;label2 with numpy
data = np.genfromtxt(os.path.join(BASE_DIR, CSV_FILE), delimiter=';', dtype='str', skip_header=1)
files = [os.path.join(BASE_DIR, row[0]) for row in data]
labels = np.array([[row[1], row[2]] for row in data], dtype='int')

# show the first 3 images
plt.figure(figsize=(15, 15))
for i in range(16):
    img = cv2.imread(files[i])
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.subplot(4, 4, i+1)
    plt.imshow(img)
    plt.title(labels[i])
    plt.axis('off')
plt.tight_layout()
plt.show()

print("Number of images: ", len(files))
print("Image shape: ", img.shape)

## 6.4.1 Baseline

Es ist zunächst das Data Splitting zu analysieren und unter Nutzung von conditional indexing für Validation- und Test-Daten zu ergänzen.

Anschließend ist als Baseline zu berechnen, welche Accuracy ein trivialer Klassifikator erreicht, der nur die häufigste Ausprägung ausgibt.
Als Ausprägungen sind zu berücksichtigen:
- [0, 0] - kein Fehler
- [1, 0] - crack
- [0, 1] - dark area bzw. inactive
- [1, 1] - sowohl crack als auch dark area


In [35]:
# Load the data for the multi-label classification with labels classified in csv file
# CSV file: file;crack;inactive
data = np.genfromtxt(os.path.join(BASE_DIR, CSV_FILE), delimiter=';', dtype='str', skip_header=1)
files = [os.path.join(BASE_DIR, row[0]) for row in data]
labels = np.array([[row[1], row[2]] for row in data], dtype='float32')

# split the data into training, validation and test sets
split_point1 = int(TRAIN_SPLIT * len(files))
split_point2 = int((TRAIN_SPLIT + VAL_SPLIT) * len(files))
indices = np.random.permutation(len(files))

train_indices = indices[:split_point1]
val_indices = indices[split_point1:split_point2]
test_indices = indices[split_point2:]

# train_files soll alle Dateinamen für das Training enthalten, train_labels die entsprechenden Labels
train_files, train_labels = np.array(files)[train_indices], labels[train_indices]

###########################
# TODO: auf validation und test anwenden
###########################

###########################
# TODO: Berechnung der Baseline für alle drei Datensätze
###########################

## 6.4.2 Data augmentation

Da es sich um ein multi-label multi-class Problem handelt, muss eine eigene Klasse MultiLabelDataset erstellt werden, die bereits implementiert ist.


Anschließend ist der Code um Data augmentation zu ergänzen. Hierzu ist die Bibliothek v2 aus ```torchvision.transforms.v2``` zu nutzen. Siehe hierzu: https://pytorch.org/vision/main/auto_examples/transforms/plot_transforms_getting_started.html 

Es ist eine sinnvolle Auswahl von Operationen von Data Augmentation durchzuführen und zu begründen. Hierbei ist darauf zu achten, dass die Orientierung der Eingangsbilder nur geringfügig verändert wird, da die Strukturen der Solarzellen aufgrund der Kontaktiereinheit stets dieselbe Richtung aufweisen. Die korrekte Funktion ist anhand der Visualisierung eines Batches zu überprüfen.

Die Fehlerbilder werden auf die TARGET_SIZE (150, 150) Pixel skaliert, um Rechenzeit zu sparen, dann zu einem Tensor und abschließend mit der für ImageNet-Datensätze notwendigen Normierung konvertiert:
```transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])```

Überprüfen Sie die korrekte Funktion des DataLoaders, indem Sie einen Mini-Batch generieren und die folgenden Ausgaben programmieren:

1. Shape des Mini-Batches
2. Wertebereich des Mini-Batches
3. Ausgabe der Bilder des Batches und zugehörige Label als Titel. Dazu müssen die
Bilder unter Nutzung der gefundenen min- und max-Werte auf 0 . . . 255 skaliert
und der Tensor in ein Numpy-Array vom Typ np.uint8 umgewandelt werden.

Hinweis: Nutzen Sie hierzu zweckmäßigerweise die Tensor-Methoden min(), max() sowie numpy() zur Umwandlung eines Tensors in ein numpy-Array.

In [None]:
# Custom dataset for the multi-label classification
class MultiLabelDataset(torch.utils.data.Dataset):
    def __init__(self, file_paths, labels, transform=None, transform_name=None):
        self.file_paths = file_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        # DHL: unlike plt.imread, cv2.imread reads these image files as 3 channels instead of 1
        # np_img = plt.imread(self.file_paths[idx])
        np_img = cv2.imread(self.file_paths[idx])
        img = Image.fromarray(np_img)

        if self.transform:
            img = self.transform(img)
        return img, self.labels[idx]


# get the image, resize it to the target size and convert it to a tensor and normalize it to imagenet
basic_transform = transforms.Compose([
    v2.Resize(TARGET_SIZE),
    v2.ToTensor(),
    v2.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    
])

augmentation_transform = transforms.Compose([
    v2.Resize(TARGET_SIZE),

    ###########################
    # TODO: data augmentation
    ###########################
    
    v2.ToTensor(),
    # normalize the image to imagenet parameters
    v2.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

# Load the train dataset with augmentation, the validation and test datasets with basic transform
train_dataset = MultiLabelDataset(train_files, train_labels, transform=augmentation_transform)
val_dataset = MultiLabelDataset(val_files, val_labels, transform=basic_transform)
test_dataset = MultiLabelDataset(test_files, test_labels, transform=basic_transform)

# Create data loaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)


def show_batch(dl):
    for images, labels in dl:

        ##############################################
        # TODO: Display a batch of images and labels
        ##############################################

        plt.show()
        break

print("Labels: [ crack, dark area]")
print("Training set batch")
show_batch(train_loader)
print("Validation set batch")
show_batch(val_loader)

## 6.4.3 Hilfsfunktionen zum Training und Testen

In [None]:

def calculate_accuracy(outputs_one_hot, labels_one_hot):
    """compute accuracy for both one-hot encoded outputs and labels for 
    multilabel classification, a data point is considered as correct if
    all labels are predicted correctly"""
    
    ######################################
    # TODO: implement the function
    acc = 0 # placeholder
    ######################################
    
    return acc


def train_epoch(model, criterion, optimizer, dataloader, device):
    """ Training function for one epoch returns loss and accuracy"""
    
    ###########################################
    # TODO: implement the function
    loss = 0    # placeholder
    acc = 0     # placeholder
    
    ###########################################

    return loss, acc


def evaluate_model(model, criterion, dataloader, device):
    """ Evaluation function for one epoch returns loss and accuracy"""
    
    ###########################################
    # TODO: implement the function
    loss = 0    # placeholder
    acc = 0     # placeholder
    ###########################################

    return loss, acc


def plot_history(history, title="Trainingsverlauf"):
    """show graphs of history-dictionary
    one subplot for trainings and validation loss and one for training and validation accuracy"""
    
    ###########################################
    # TODO: implement the function
    ###########################################

    plt.show()


def count_parameters(model):
    """print the number of trainable parameters and total parameters in a PyTorch model"""
    
    ###########################################
    # TODO: implement the function
    param_count = 0 # placeholder
    ###########################################

    return param_count


def fit(model, criterion, optimizer, epochs):
    history = {'train_loss': [], 'val_loss': [], 'train_accs': [], 'val_accs': []}

    for epoch in range(epochs):
        
        ##############################################
        # TODO: implement the training and evaluation
        history = {'train_loss': [], 'val_loss': [], 'train_accs': [], 'val_accs': []} # placeholder
        ##############################################


    return history


print("Successfully done with imports and functions")    

## 6.4.4 Model 1 - VGG16 from scratch - no pretrained weights

Beachte, dass die Shape des ersten Fully Connected Layer eine auf die Ausgabe des letzten Convolutional Layers angepasst werden muss.

In [None]:
full_model = models.vgg16(weights=None)   # initialize model
conv_base = full_model.features

# check output of conv_base to find appropriate input size for classifier
test_output = conv_base(next(iter(train_loader))[0])
print(f"Conv-Layer Output of test batch: {test_output.shape}")

# Define the model with the pretrained VGG16 features and a custom classifier

#####################################################################
# TODO: Define the model either as sequential model or as class
#####################################################################

model.to(device)                        # use GPU if available
model.eval()                            # set layers to evaluation mode


# Define BCEWithLogitsLoss as the loss function for multi-label classification
criterion = nn.BCEWithLogitsLoss()  # for binary proble use binary cross entropy loss

# Define optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

# Training loop
print("[i] Training of model_a: VGG16.featurs with own classifier")
print("[i] Number of trainable parameters:", count_parameters(model))
hist = fit(model, criterion, optimizer, EPOCHS)
plot_history(hist, "VGG16 from scratch - no pretrained weights")

## 6.4.5 Modellvergleich

- Modell 1: Modell ohne Transfer Learning wie oben
- Modell 2: Modell mit auf Image-Net vortrainierter Convolutional Base und neu erstelltem Klassfikator
- Modell 3: Modell mit auf Image-Net vortrainierter Convolutional Base, die eingefroren wird und mit neu erstelltem Klassfikator

## 6.4.6 Hyperparameter-Tuning
Die generierten Modelle aus der vorigen Teilaufgabe sind zu vergleichen und es ist das bestgeeignete begründet auszuwählen.

Für dieses eine Modell soll im Rahmen des Hyperparameter-Tunings untersucht werden, welche Lernrate und welche Anzahl von Neuronen des ersten hidden Layers des Classifiers optimal sind. Hierzu soll die jeweils beste Validierungs-Accuracy (und die zugehörige Trainings-Accuracy) festgehalten werden. Diese besten Ergebnisse sind übersichtlich in einer Tabelle für die unterschiedlichen Lernraten und Anzahl der Neuronen darzustellen.

Hinweis: Die Funktion ist zunächst bei nur 2 Epochen zu testen. Wenn die Funktion gegeben ist, dann auf 10 Epochen erweitern (besser 20).

In [None]:
LEARNING_RATES = [1e-3, 1e-4]
UNITS = [64, 128, 256, 512]
EPOCHS_HYPER = 5

final_accs = []

start = time.time()

####################################################
# TODO: implement the hyperparameter grid search
####################################################

stop = time.time()
print(f"Execution time for {len(final_accs)} runs is {stop - start:.0f} secs")
df = pd.DataFrame(final_accs, columns=['lr', 'units', 'best_train_acc', 'best val_acc'])
display(df)

## 6.4.7 Evaluation

Die optimale Netzwerkkonfiguration ist mit 30 Epochen zu trainieren und die beste Epoche in Bezug auf die Validierungs-Accuracy zu speichern. Dazu ist die Trainingsfunktion fit entsprechend zu erweitern.

Es sind die folgenden Auswertungen durchzuführen:
- Accuracy des Modells
- Klassenweise Accuracy, Precision, Recall

Auf dieser Grundlage ist das Ergebnis kritisch bezüglich folgendender Aspekte zu bewerten:
- Vor- und Nachteile gegenüber der herkömmlichen Klassifikation aus dem vorigen Laborversuch
- Ansätze für weitere Verbesserungen des Deep Learning-Ansatzes

In [None]:
####################################################
# TODO: Train the best model as final_model and save 
# the model with the highest validation accuracy
####################################################

### Hilfsfunktionen zur Evalation

In [None]:
# load the best model instead of the last trained model
model_final = torch.load("best.pth")
test_loss, test_acc = evaluate_model(model_final, criterion, test_loader, device)
print(f"Test Loss: {test_loss:.4f} - Test Acc: {test_acc:.4f}")

Erwartungswerte zur Kontrolle: Die Test Accuracy sollte im Bereich von 90 bis 94 % liegen.

### Klassenweise Accuracy

Es ist eine Vorhersage auf einem Testdatenbatch auszuführen und jeweils ein zweispaltiges numpy Array der Vorhersagen mit den Werten 0 und 1 sowie ein numpy-Array der zugehörigen tatsächlichen Label auszugeben.
Je Spalte ist dann eine Berechnung der geforderten Größen Acc, P,  R durchzuführen.

**Hinweis:** Vor der Nutzung der Methode ```numpy()``` zur Umwandlung eines Tensors in ein numpy-Array ist der Tensor von der GPU in die CPU zu verschieben. Hierzu wird die Methode ```cpu()``` eingesetzt.

In [None]:
model_final.eval()                              # Switch to evaluation mode
with torch.no_grad():                           # Disable gradient computation for inference to save memory and computation time
    all_labels = np.array([])                   # store labels of all batches
    all_outputs = np.array([])                  # store outputs of all batches

    for i, data in enumerate(test_loader):      # iterate over all test batches
        inputs = data[0].to(device)             # get inputs and move to GPU if available
        labels = data[1].to(device)             # get labels and move to GPU if available
        outputs = model_final(inputs)           # forward pass
        
        if i == 0:
            test_labels = labels.cpu().numpy()                                  # initial run: fill with first batch
            test_outputs = outputs.cpu().numpy()                                  
        else:
            test_labels = np.vstack((test_labels, labels.cpu().numpy()))        # append the labels of the batch
            test_outputs = np.vstack((test_outputs, outputs.cpu().numpy()))


preds = (test_outputs > 0.5)                    # convert outputs to binary predictions
print("Vorhersage: ", preds)                    # print predictions just for debugging


for class_idx in range(2):
    print(f"Class {class_idx}")
    
    ######################################
    # TODO: implement the function
    ######################################

    print(f"True Positives  : {true_positives}")
    print(f"True Negatives  : {true_negatives}")
    print(f"False Positives : {false_positives}")
    print(f"False Negatives : {false_negatives}")
    print(f"Accuracy        : {acc:.4f}")
    print(f"Precision       : {precision:.4f}")
    print(f"Recall          : {recall:.4f}")
    print("")

### Interpretation des Ergebnisses

-  Wie sind die Ergebnisse zu bewerten?
- Finde Ansätze, wie man zu besseren Ergebnissen kommen kann.

### Qualitative Analyse anhand eines Testbatches

In [None]:
def show_batch(model, loader, batch_nr=0):
    """function to show a batch of images with the true labels and the predicted labels"""
    model.eval()
    with torch.no_grad():
        for i, data in enumerate(loader):
            if i >= batch_nr:
                images = data[0].to(device)
                labels = data[1].to(device)
                predictions = model(images.to(device))
                break

    plt.figure(figsize=(15, 15))
    for i in range(len(images)):
        img = images[i].permute(1, 2, 0).cpu().detach().numpy()
        min_val = img.min()
        max_val = img.max()
        img = ((img - min_val) / (max_val - min_val) * 255).astype(np.uint8)
        
        prediction = (predictions[i].cpu().numpy() > 0).astype(np.uint8)
        label = labels[i].cpu().numpy().astype(np.uint8)
        plt.subplot(len(images) // 4 + 1, 4, i + 1)
        plt.imshow(img)
        plt.axis('off')
        plt.title(str(label) + " as " + str(prediction))
    plt.suptitle("Reihenfolge: [Crack, Dark Area]", fontsize=16, y=0.92)
    plt.show()

# Display some test images with their predictions
print("Test set batch")
show_batch(model_final, test_loader, 0)
print("Validation set batch")
show_batch(model_final, val_loader, 0)