## ParkItRight - Specific

#### Import the required libraries

In [1]:
%matplotlib inline

import os
import math
import random
import shutil
import torch
from ultralytics import YOLO
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import csv
import pandas as pd

#### Clear the splits for training, validating and testing the model

In [11]:
folders_to_clear = ["images_split/train/correct", "images_split/train/incorrect", "images_split/test/correct", "images_split/test/incorrect", "images_split/val/correct", "images_split/val/incorrect"]
for folder in folders_to_clear:
    for item in os.listdir(folder):
        if item != ".gitkeep":
            item_path = os.path.join(folder, item)
            os.unlink(item_path)
if os.path.exists("images_split/train.cache"):
    os.unlink("images_split/train.cache")
if os.path.exists("images_split/val.cache"):
    os.unlink("images_split/val.cache")

#### Copy the data in random splits the images for testing and splits of 75% for training and 25% for validation of the specific data

In [3]:
labels = ["correct", "incorrect"]
images_amount = 0
for label in labels:
    specific_images = os.listdir(f"images_specific/other/{label}")
    if ".DS_Store" in specific_images:
        specific_images.remove(".DS_Store")
    random.shuffle(specific_images)
    images_amount += len(specific_images)
    cutoff_index = math.ceil(len(specific_images) * 0.75)
    test_images = os.listdir(f"images_specific/mazda_mx5/{label}")
    random.shuffle(test_images)

    splits = {
        "train": ["other", specific_images[:cutoff_index]],
        "val": ["other", specific_images[cutoff_index:]],
        "test": ["mazda_mx5", test_images[:5]]
    }

    for split_name, split_content in splits.items():
        for file in split_content[1]:
            src_path = f"images_specific/{split_content[0]}/{label}/{file}"
            dest_path = f"images_split/{split_name}/{label}/{file}"
            shutil.copy(src_path, dest_path)

#### Enter the amount of epochs for training the YOLO model

In [4]:
user_input = input("Enter the amount of epochs for the training of the YOLO model.")
while not user_input.isdigit():
    user_input = input("The provided input is not a digit! Enter the amount of epochs for the training of the YOLO model.")
epochs = int(user_input)

#### Select the available device to perform the torch calculations

In [None]:
if torch.cuda.is_available():
    print("Cuda is available. Torch will use Cuda.")
    device = "cuda"
elif torch.backends.mps.is_available():
    print("MPS is available. Torch will use MPS.")
    device = "mps"
else:
    print("GPU is not available. Torch will fall back to CPU.")
    device = "cpu"

#### Train and load the best YOLO model

In [None]:
model = YOLO("base_model/yolo11s-cls.pt")
results = model.train(data="images_split", epochs=epochs, imgsz=640, device=device)

old_model_folder = "runs/classify/train"
new_model_folder_base = "runs/classify/specific"
new_model_folder = new_model_folder_base
counter = 1
while os.path.exists(new_model_folder):
    new_model_folder = f"{new_model_folder_base}({counter})"
    counter += 1
os.rename(old_model_folder, new_model_folder)
models = [YOLO(f"{new_model_folder}/weights/best.pt"), YOLO(f"{new_model_folder}/weights/last.pt")]

#### Get the model predictions and true labels based on the test dataset

In [7]:
model_test_results= {}
for model in models:
    results_correct = model("images_split/test/correct", verbose=False)
    results_incorrect = model("images_split/test/incorrect", verbose=False)
    true_labels = [0] * len(results_correct) + [1] * len(results_incorrect)

    predicted_labels = []
    for result in results_correct + results_incorrect:
        id = result.probs.top1
        label = result.names[id]
        if label == "correct":
            predicted_labels.append(0)
        else:
            predicted_labels.append(1)
    model_test_results[model] = {"true_labels": true_labels, "predicted_labels": predicted_labels}

#### Select the best performing model based on the calculated f1-score

In [8]:
f1_models = {}
for model, test_results in model_test_results.items():
    f1 = f1_score(test_results["true_labels"], test_results["predicted_labels"])
    f1_models[model] = f1
best_model = max(f1_models, key=f1_models.get)

#### Calculate the model performance and set up a confusion matrix

In [None]:
acc = accuracy_score(model_test_results[best_model]["true_labels"], model_test_results[best_model]["predicted_labels"])
precision = precision_score(model_test_results[best_model]["true_labels"], model_test_results[best_model]["predicted_labels"])
recall = recall_score(model_test_results[best_model]["true_labels"], model_test_results[best_model]["predicted_labels"])
f1 = f1_models[best_model]

print(f"Accuracy:   {acc: .2f}")
print(f"Precision:  {precision: .2f}")
print(f"Recall:     {recall: .2f}")
print(f"F1-score:   {f1: .2f}")

cm = confusion_matrix(model_test_results[best_model]["true_labels"], model_test_results[best_model]["predicted_labels"])
plt.figure(figsize=(6, 5))
cm = sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Correct', 'Incorrect'], yticklabels=['Correct', 'Incorrect'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

#### Handle storing the model performance

In [10]:
user_input = input("Do you want to store the evaluation data (y/n)?")
while user_input != "y" and user_input != "n":
    user_input = input("The provided input is invalid! Do you want to store the evaluation data (y/n)?")

if user_input == "y":
    df = pd.read_csv(f"{new_model_folder}/results.csv")
    training_time = df['time'].iloc[-1]
    data = [images_amount, 20, acc, precision, recall, f1, training_time]
    with open("evaluation_data/evaluation_data.csv", mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(data)
    figure_name = f"evaluation_data/confusion_matrices/specific.png"
    counter = 1
    base_name, ext = os.path.splitext(figure_name)
    while os.path.exists(figure_name):
        figure_name = f"{base_name}({counter}){ext}"
        counter += 1
    cm.figure.savefig(figure_name)
else:
    shutil.rmtree(new_model_folder)