In [None]:
import os
import json
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np

from torchvision.models.detection import SSD300_VGG16_Weights
from torchvision.models.vgg import VGG16_Weights
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
from torchvision.transforms import ToTensor
from torchvision.models.detection import ssd
from torchvision.models.detection import ssdlite320_mobilenet_v3_large
from PIL import Image
from tqdm import tqdm

import PIL.Image
import torchvision.transforms.functional as F



In [None]:
root_dir = r"C:\Users\Domi\Documents\GitHub\Deep-Vision-sta\Datasets\Face Mask Detection Dataset\Medical mask\Medical mask\Medical Mask"

NORMALIZE = True
MEAN = [0.485, 0.456, 0.406]
STD = [0.229, 0.224, 0.225]
BATCH_SIZE = 1

RESIZE = (320, 320)
ROUND_RESIZED_BBOXES = False
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY = 0.0005
NESTEROV = True
TEST_SIZE = 0.2

ALLOWED_LABELS = [3, 4, 5, 6]
class_mapping = {
    "hijab_niqab": 0,
    "mask_colorful": 1,
    "mask_surgical": 2,
    "face_no_mask": 3,
    "face_with_mask_incorrect": 4,
    "face_with_mask": 5,
    "face_other_covering": 6,
    "scarf_bandana": 7,
    "balaclava_ski_mask": 8,
    "face_shield": 9,
    "other": 10,
    "gas_mask": 11,
    "turban": 12,
    "helmet": 13,
    "sunglasses": 14,
    "eyeglasses": 15,
    "hair_net": 16,
    "hat": 17,
    "goggles": 18,
    "hood": 19
}

## DATACLASSES

In [None]:
class DatasetLoader:
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.images_dir = os.path.join(root_dir, "images")
        self.annotations_dir = os.path.join(root_dir, "annotations")
    
    
    def load_dataset(self):
        dataset = []
        supported_image_extensions = (".jpg", ".jpeg", ".png")
        supported_annotation_extensions = (".jpg.json", ".jpeg.json", ".png.json")  # Beispielerweiterungen, anpassen Sie dies entsprechend

        for filename in os.listdir(self.images_dir):
            if filename.lower().endswith(supported_image_extensions):
                image_path = os.path.join(self.images_dir, filename)

                # Annotationen
                annotation_filename = os.path.splitext(filename)[0]
                for extension in supported_annotation_extensions:
                    annotation_file = annotation_filename + extension
                    annotation_path = os.path.join(self.annotations_dir, annotation_file)
                    if os.path.exists(annotation_path):
                        break

                # Bild und Annotationen einlesen
                image, annotations = self._read_data(image_path, annotation_path)
                dataset.append((image, annotations))

        return dataset



    def _read_data(self, image_path, annotation_path):
        image = Image.open(image_path)
        
        with open(annotation_path, 'r') as f:
            annotations = json.load(f)
        
        image_annotations = {
            "filename": annotations["FileName"],
            "annotations": []
        }
        
        for annotation in annotations["Annotations"]:
            bbox = annotation["BoundingBox"]
            xmin, ymin, xmax, ymax = bbox
            label = annotation["classname"]
            
            formatted_annotation = {
                "bbox": [xmin, ymin, xmax, ymax],
                "label": label
            }
            
            image_annotations["annotations"].append(formatted_annotation)
        
        return image, image_annotations
    

# class MyCustomDataset(Dataset):
#     def __init__(self, dataset):
#         self.dataset = dataset
#         self.transform = ToTensor()

#     def __len__(self):
#         return len(self.dataset)

#     def __getitem__(self, index):
#         data = self.dataset[index]
#         image = self.transform(data[0])  # Bild in Tensor umwandeln
#         annotations = data[1]  # Annotationen beibehalten

#         return image, annotations   
    
def check_labels(target):
    labels = target['labels']
    boxes = target['boxes']

    num_boxes = boxes.size(0)
    num_labels = labels.size(0)

    if num_labels != num_boxes:
        print("Error: Labels are not in the expected format.")

In [None]:
class MaskDetectionDataset(Dataset):
    def __init__(self, root_dir, target_size=(600, 900)):
        self.root_dir = root_dir
        self.annotations = []
        self.target_size = target_size
        self.load_annotations()

    def load_annotations(self):
        annotation_files = os.listdir(f"{self.root_dir}/annotations")
        for file_name in annotation_files:
            with open(f"{self.root_dir}/annotations/{file_name}", "r") as f:
                annotation_data = json.load(f)
                annotations = annotation_data["Annotations"]
                file_name = annotation_data["FileName"]
                self.annotations.append((annotations, file_name))
                # Check if the boxes are valid
                for annotation in annotations:
                    boxes = annotation["BoundingBox"]
                    if boxes[0] >= boxes[2] or boxes[1] >= boxes[3]:
                        print("Invalid bounding box coordinates in file:", file_name)
                        break

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        annotations = self.annotations[idx][0]
        file_name = self.annotations[idx][1]
        image_path = f"{self.root_dir}/images/{file_name}"
        image = PIL.Image.open(image_path).convert("RGB")
        original_image_width, original_image_height = image.size
        image = F.resize(image, self.target_size)
        image = F.to_tensor(image)

        boxes = []
        labels = []
        for annotation in annotations:
            box = annotation["BoundingBox"]
            if box[0] < box[2] and box[1] < box[3]:
                # Resize the bounding box coordinates
                box_resized = [
                    box[0] * self.target_size[0] / original_image_width,
                    box[1] * self.target_size[1] / original_image_height,
                    box[2] * self.target_size[0] / original_image_width,
                    box[3] * self.target_size[1] / original_image_height
                ]
                boxes.append(box_resized)
                class_name = annotation["classname"]
                # Get the class label based on the class name
                class_label = self.get_class_label(class_name)
                labels.append(class_label)

        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["area"] = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        target["iscrowd"] = torch.zeros((boxes.shape[0],), dtype=torch.int64)
        target["image_id"] = torch.tensor([idx])

        return image, target

    
    def get_class_label(self, class_name):
        return class_mapping.get(class_name, -1)  # Return -1 if class_name is not found


In [None]:
dataset_loader = DatasetLoader(root_dir)
dataset = dataset_loader.load_dataset()
image, annotations = dataset[0]
print(len(dataset))

## Model Setup

In [None]:
import utils
# def collate_fn(batch):
#     images = []
#     annotations = []    

#     # Definiere die Zielgröße für das Rescaling
#     target_size = RESIZE

#     # Erstelle die Rescaling-Transformation
#     rescale_transform = transforms.Resize(target_size, interpolation=Image.Resampling.BILINEAR)

#     for image, annotation in batch:
#         # Wandele den Tensor in eine PIL-Image-Instanz um
#         image_size = image.size()
#         image = transforms.ToPILImage()(image)

#         # Wende die Rescaling-Transformation auf das Bild an
#         image = rescale_transform(image)

#         # Konvertiere das Bild in ein Tensor und füge es zur Liste hinzu
#         image = transforms.ToTensor()(image)
#         images.append(image)

#         # Passe die Bounding-Boxen an die neue Größe des Bildes an
#         width_ratio = target_size[0] / image_size[2]
#         height_ratio = target_size[1] / image_size[1]
#         for bbox_dict in annotation['annotations']:
#             bbox = bbox_dict['bbox']
#             x_min, y_min, x_max, y_max = bbox
#             x_min *= width_ratio
#             y_min *= height_ratio
#             x_max *= width_ratio
#             y_max *= height_ratio
#             bbox_dict['bbox'] = [x_min, y_min, x_max, y_max]

#         # Füge die Annotations zur Annotations-Liste hinzu
#         annotations.append(annotation)

#     # Passe die Größen der Bilder an, um stapelbar zu sein
#     images = torch.stack(images)

#     # Normalize die Pixelwerte der Bilder
#     if NORMALIZE:
#         images = transforms.Normalize(mean=MEAN, std=STD)(images)

#     return images, annotations

from torchvision.models.detection import SSDLite320_MobileNet_V3_Large_Weights
from torchvision.models import MobileNet_V3_Large_Weights
from torchvision.models.detection import ssdlite320_mobilenet_v3_large


def setup_model(batch_size, dataset, lr, momentum, weight_decay, nesterov, test_size, weights_backbone=None, weights=None,):
    # Modell initialisieren
    model = ssdlite320_mobilenet_v3_large(weights=weights, weights_backbone=weights_backbone)

    # Daten in Trainings- und Testdaten aufteilen
    #train_data, test_data = train_test_split(dataset, test_size=TEST_SIZE, random_state=42)
    dataset = MaskDetectionDataset(root_dir, RESIZE)
    train_size = int((1-test_size) * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

    # Trainingsdaten vorbereiten und DataLoader erstellen
    #train_dataset = MyCustomDataset(train_data)
    
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=utils.collate_fn)

    # Testdaten vorbereiten und DataLoader erstellen
    #test_dataset = MaskDetectionDataset(root_dir, RESIZE)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True, collate_fn=utils.collate_fn)

    optimizer = torch.optim.SGD(model.parameters(), lr=0.00005, momentum=0.9, weight_decay=0.0005, nesterov=True)    

    return model, train_dataloader, test_dataloader, optimizer

## Draw Images

In [None]:
def visualize_sample(dataloader, x):
    # Rufen Sie das x-te Element aus dem Dataloader ab
    x -=1 
    images, annotations = next(iter(dataloader))
    image = transforms.ToPILImage(images[x])
    boxes = annotations[x]['annotations']
    labels = [box['label'] for box in boxes]
    print(labels)
    # Erstellen Sie eine neue Figur und Achse
    fig, ax = plt.subplots(1)
    # Zeigen Sie das Bild in der Achse an
    ax.imshow(image.permute(1, 2, 0))
    # Iterieren Sie über die Bounding Boxes und zeichnen Sie sie als Rechtecke in der Achse
    for box, label in (boxes, labels):
        x_min, y_min, x_max, y_max = box['bbox']
        width = x_max - x_min
        height = y_max - y_min
        #print(width, height)
        rect = patches.Rectangle((x_min, y_min), width, height, linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
        
        #ax.text(x_min, y_min, f"Label: {class_mapping[label]}", color='r', fontsize=8, bbox=dict(facecolor='white', alpha=0.7, edgecolor='none'))
        ax.text(x_min, y_min, f"{label}", color='r', fontsize=8, bbox=dict(facecolor='white', alpha=0.7, edgecolor='none'))
    # Zeigen Sie die visualisierten Bounding Boxes an
    plt.show()


def draw_image_with_boxes(image, target):    
    # Unnormalize the image
    if NORMALIZE:
        image = transforms.Normalize(mean=[-m / s for m, s in zip(MEAN, STD)], std=[1 / s for s in STD])(image)
    image_pil = transforms.ToPILImage()(image)

    # Kopiere die Bounding-Box-Koordinaten auf die CPU und konvertiere sie in numpy-Arrays
    boxes = target["boxes"]
    labels = target["labels"]
    boxes = boxes.cpu().numpy()
    labels = labels.cpu().numpy()
        
    # Erstelle eine neue Figur und Achse
    fig, ax = plt.subplots(1)    
    # Zeige das Bild in der Achse
    ax.imshow(image_pil)
    print(target)
    
    # Iteriere über die Bounding-Boxen und zeichne sie als Rechtecke in der Achse
    for box, label in zip(boxes, labels):
        for label in ALLOWED_LABELS:
            x_min, y_min, x_max, y_max = box
            width = x_max - x_min
            height = y_max - y_min
            rect = patches.Rectangle((x_min, y_min), width, height, linewidth=2, edgecolor='r', facecolor='none')
            ax.add_patch(rect)
            ax.text(x_min, y_min, f"Label: {class_mapping[label]}", color='r', fontsize=8, bbox=dict(facecolor='white', alpha=0.7, edgecolor='none'))
    # Zeige die Achse
    plt.show()


def visualize_prediction(images, model, confidence_threshold, counter = 10):
    # Set the model to evaluation mode
    model.eval() 
    # Perform inference
    with torch.no_grad():
        predictions = model(images)
        #good = torch.argwhere(scores > confidence_threshold)

    for image, prediction in zip(images, predictions):
        if NORMALIZE:
            # Unnormalize the image
            image = transforms.Normalize(mean=[-m / s for m, s in zip(MEAN, STD)], std=[1 / s for s in STD])(image)
        image_pil = transforms.ToPILImage()(image)

        # Get the predicted bounding boxes, labels, and scores
        boxes = prediction['boxes'].cpu().numpy()
        labels = prediction['labels'].cpu().numpy()
        scores = prediction['scores'].cpu().numpy()

        # Visualize the image and predicted bounding boxes
        fig, ax = plt.subplots(1)
        ax.imshow(image_pil)
        allowed_labels = [3, 4, 5, 6]

        for box, label, score in zip(boxes, labels, scores):
            if label in allowed_labels and score > confidence_threshold and counter%10 == 0:
                x_min, y_min, x_max, y_max = box
                width = x_max - x_min
                height = y_max - y_min
                class_name = list(class_mapping.keys())[list(class_mapping.values()).index(label)]
                rect = patches.Rectangle((x_min, y_min), width, height, linewidth=2, edgecolor='r', facecolor='none')
                ax.add_patch(rect)
                ax.text(x_min, y_min, f"{class_name}: {score}", color='r', fontsize=8, bbox=dict(facecolor='white', alpha=0.7, edgecolor='none'))
                counter=0
        plt.show()
    model.train()

In [None]:
#visualize_sample(train_dataloader, 1)

## Evaluation

from coco_eval import CocoEvaluator


def calculate_mAP(model, dataloader, device):
    # Set the model to evaluation mode
    model.eval()

    coco_evaluator = CocoEvaluator(dataloader, iou_types="bbox")

    with torch.no_grad():
        for images, targets in dataloader:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            outputs = model(images, targets)
            coco_evaluator.update(targets, outputs)

    coco_evaluator.synchronize_between_processes()
    coco_evaluator.accumulate()
    coco_evaluator.summarize()

    mAP = coco_evaluator.stats[0]

    return mAP



## Training

In [None]:
def train_one_epoch_own(model, optimizer, train_dataloader, device, epoch, num_epochs):
    model.train()
    pbar = tqdm(train_dataloader, total=len(train_dataloader))
    total_iterations = len(train_dataloader)
    total_loss = 0.0
    counter = 0
    
    for images, annotations in pbar:
        images = images.to(device)
        
        # Annotationsdaten aufbereiten
        targets = []

        for annotation in annotations:
            boxes = annotation["annotations"]
            labels = [box["label"] for box in boxes]
            bboxes = [box["bbox"] for box in boxes]
            #print(boxes[0])

            # Wandele Labels in numerische Werte um
            labels = [class_mapping[label] for label in labels]

            target = {
                "boxes": torch.tensor(bboxes, dtype=torch.float32).to(device),
                "labels": torch.tensor(labels, dtype=torch.int64).to(device)
            }
            check_labels(target)
            targets.append(target)

        optimizer.zero_grad()

        
        # print(targets[0]['boxes'].size())
        # print(images[0].shape)
        # print(images[0])
        # print(targets[0])
        # break


        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()
        optimizer.step()
        
        total_loss += losses.item()
        average_loss = total_loss / (pbar.n + 1)
        
        pbar.set_description(f"Epoch [{epoch+1}/{num_epochs}], Average-Loss: {average_loss:.4f}, Loss: {losses.item():.4f}")

        #draw_image_with_boxes(images[0], targets[0])
        if (counter + 1) % (total_iterations // 20) == 0:
            visualize_prediction([images[0]], model, confidence_threshold=0.3)
        #visualize_prediction(images, model, confidence_threshold=0.3, counter=counter)
        counter += 1
    pbar.close()

In [None]:
model, train_dataloader, test_dataloader, optimizer = setup_model(BATCH_SIZE, 
                                                       weights_backbone= MobileNet_V3_Large_Weights.DEFAULT, 
                                                       weights=SSDLite320_MobileNet_V3_Large_Weights.DEFAULT, 
                                                       dataset=dataset,
                                                       lr=LEARNING_RATE,
                                                       momentum=MOMENTUM,
                                                       weight_decay=WEIGHT_DECAY,
                                                       nesterov=NESTEROV,
                                                       test_size=TEST_SIZE)



# for epoch in range(num_epochs):
#     train_one_epoch_own(model, optimizer, train_dataloader, device, epoch, num_epochs)
#     #mAP = calculate_mAP(model, test_dataloader, device)
#     #print(f"Epoch [{epoch+1}/{num_epochs}], mAP: {mAP:.4f}")

In [None]:
samples = next(iter(test_dataloader))
samples

In [None]:
from engine import train_one_epoch, evaluate

# # Trainingsschleife
num_epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define empty arrays to collect metrics
ap_values = []
ar_values = []
losses = []

for epoch in range(num_epochs):
    # training for one epoch
    train_one_epoch(model, optimizer, train_dataloader, device, epoch, print_freq=1, losses_out=losses)
    # update the learning rate
    # lr_scheduler.step()
    # evaluate on the test dataset
    # adjusted numpy library code for evaluate to work with newer numpy versions
    evaluator = evaluate(model, test_dataloader, device=device)
    #print(evaluator)

    # Extract the metrics from the evaluator
    iou_thresholds = evaluator.coco_eval['bbox'].params.iouThrs
    average_precisions = evaluator.coco_eval['bbox'].stats[:6]
    average_recalls = evaluator.coco_eval['bbox'].stats[6:]

    # Append the metrics to the arrays
    ap_values.append(average_precisions)
    ar_values.append(average_recalls)

In [None]:
%matplotlib inline

plt.plot(losses)
plt.xlabel("Iteration")
plt.ylabel("Loss")
plt.show()

In [None]:
%matplotlib inline

# Convert the arrays to numpy arrays for easier plotting
ap_values = np.array(ap_values)
ar_values = np.array(ar_values)

iou_thresholds_available = ["0.50:0.95", "0.50", "0.75", "0.50:0.95_small", "0.50:0.95_medium", "0.50:0.95_large"]

# Plot the average precisions over epochs
plt.figure(figsize=(10, 5))
for i, iou_thresh in enumerate(iou_thresholds_available):
    plt.plot(ap_values[:, i], label=f"IoU={iou_thresh}")
plt.xlabel("Epochs")
plt.ylabel("Average Precision")
plt.title("Average Precision vs. Epochs")
plt.legend()
plt.show()

# Plot the average recalls over epochs
plt.figure(figsize=(10, 5))
for i, iou_thresh in enumerate(iou_thresholds_available):
    plt.plot(ar_values[:, i], label=f"IoU={iou_thresh}")
plt.xlabel("Epochs")
plt.ylabel("Average Recall")
plt.title("Average Recall vs. Epochs")
plt.legend()
plt.show()

In [None]:
def visualize_prediction(images, model, confidence_threshold, allowed_labels = [3, 4, 5, 6]):
    # Set the model to evaluation mode
    model.eval()
    mean=[0.485, 0.456, 0.406] 
    std=[0.229, 0.224, 0.225]
    # Preprocess the images
    ims = list(image.to(device) for image in images)
    # Perform inference
    with torch.no_grad():
        predictions = model(ims)
        #good = torch.argwhere(scores > confidence_threshold)
        print(predictions)

    for image, prediction in zip(images, predictions):
        # Convert the image tensor to a PIL Image
        #image_pil = transforms.ToPILImage()(image)

        # Unnormalize the image
        image_pil = transforms.ToPILImage()(image)


        # Get the predicted bounding boxes, labels, and scores
        boxes = prediction['boxes'].cpu().numpy()
        labels = prediction['labels'].cpu().numpy()
        scores = prediction['scores'].cpu().numpy()

        # Visualize the image and predicted bounding boxes
        fig, ax = plt.subplots(1)
        ax.imshow(image_pil)

        for box, label, score in zip(boxes, labels, scores):
            if label in allowed_labels and score > confidence_threshold:
                x_min, y_min, x_max, y_max = box
                width = x_max - x_min
                height = y_max - y_min
                class_name = list(class_mapping.keys())[list(class_mapping.values()).index(label)]
                rect = patches.Rectangle((x_min, y_min), width, height, linewidth=2, edgecolor='r', facecolor='none')
                ax.add_patch(rect)
                ax.text(x_min, y_min, f"{class_name}", color='r', fontsize=8, bbox=dict(facecolor='white', alpha=0.7, edgecolor='none'))

        plt.show()
    model.train()

In [None]:
samples = next(iter(test_dataloader))

visualize_prediction(samples[0], model, 0.3, [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19])