# Correction from Label Studio and Model Evaluation

This notebook SHOULD NOT be launched until any corrections have been made in Label Studio.
The purpose of the various functions is to be able to extract the data to be analysed after training in order to assess the robustness of the model.

**Warning 1**
The requirement is to use, or create, a Label Studio account: (https://labelstud.io/guide/install.html) and to have followed the workflow of the previous notebooks (Download_from_manifest, Data_preparation_for_training, Train_and_Detect_YOLOv8.ipynb) or at least to have created similar files.

**Warning 2**
    The export format for corrections from Label Studio MUST be csv ONLY.
    Label Studio's YOLO export format does not allow you to keep the names of the images as they were imported: 
    in the context of this workflow, these are the URLs of the images from which this script can retrieve the name of the 
    image and generate .txt files with the same name as the image (thus allowing you to use this new data for a new training session).

**Warning 3**
It is essential to complete the "Labeling Interface" by specifying strictly the same class names 
(case, special characters, etc.) as those declared in the .json file.

**Warning 4**
If there is some issue in Label Studio, you can change the labeling results' name from "predictions" to "annotations",
since Label documentation explain that you can't change the bounding box coordinates for prediction, 
only for detection : https://labelstud.io/guide/predictions.html#Predictions-are-read-only.
But I tried and you can change the predicted bounding boxes and export the new data with the changes without any problem.

## Environment

In [None]:
import os
import glob
import pandas as pd
import ast

from modules.class_names_functions import get_labels, get_class_name, get_class_code
from modules.transform_coordinates_functions import from_ls_to_yolo

## Get the Label Studio correction in new .txt files

### Generate new txt files with correct bounding boxes 

In [None]:
def generate_corrected_files(correction_from_ls_csv, dataset_path, yolo_model_folder):
    
    '''
    This function generates new .txt files with the corrections made in Label Studio into a .txt file and should only be run 
    AFTER the corrections have been made and exported.
    
    The files generated have the same name as the YOLO results and are sent to a 'correctedLabels' folder in the same 
    folder as 'labels' so that the data can be used in the following 'Model_evaluation' notebook.
    '''
    
    results_folder = os.path.dirname(correction_from_ls_csv)
    
    
    yolo_result = pd.read_csv(os.path.join(results_folder, os.path.basename(dataset_path) + '.csv'))
    ls_reannoted = pd.read_csv(correction_from_ls_csv)
    
    labels = get_labels(os.path.join(yolo_model_folder, 'labels.txt'))

    for _, yolo_row in yolo_result.iterrows():
        result_path = str(yolo_row['YOLO_Results_File'])
        img_url = str(yolo_row['Url_Image'])

        matched_rows = ls_reannoted.loc[ls_reannoted['image'] == img_url]

        if not matched_rows.empty:
            
            corrected_labels_dir = os.path.dirname(result_path).replace('labels', 'correctedLabels')
            
            if not os.path.exists(corrected_labels_dir):
                os.makedirs(corrected_labels_dir)
                print(corrected_labels_dir)
      
            with open(os.path.join(corrected_labels_dir, os.path.basename(result_path)), 'w') as reannotated:
                try: 
                    for _, row in matched_rows.iterrows():
                        new_coordinates = str(row['label'])
                        # print(new_coordinates)
                        # print(result_path.replace('.txt', '_correction.txt'))

                        try:
                            # Normalize coordinates in the list if there is more than one bounding box
                            coordinates_list = ast.literal_eval(new_coordinates)
                            for coordinates in coordinates_list:
                                x, y, width, height = from_ls_to_yolo(coordinates['x'], coordinates['y'], coordinates['width'], coordinates['height'])
                                rectanglelabels = get_class_code(coordinates['rectanglelabels'], labels)
                                line = f"{rectanglelabels} {x} {y} {width} {height}\n"
                                reannotated.write(line)
                                # print(img_url)
                                # print(line)

                        except ValueError:
                            reannotated.write('')
                            # print(f"The image {result_path} has no miniature")
                            
                except KeyError:
                    reannotated.write('')
                    # print(f"The {result_path} image has no miniature")
        else:
            print(f"No corrections made")
            
    print(f'The corrected files have been created in {corrected_labels_dir}')

## Get results in csv

### Get a list of images used for training the model

In [None]:
def get_img_from_training(yolo_model_folder, dataset_path):
    
    """
    This function is used to return a list of the images that may have been used to train the model.
    The will be made from the traindata.txt send in the model folder.

    **Warning**
    Of course, this script assumes that the training and prediction data have a similar name
    and the data were be return in the predict dataset
    
    """
    
    train_data_list = os.path.join(yolo_model_folder, 'dataset_statistics/traindata.txt')
    
    train_data = []
    
    with open(train_data_list, 'r') as train_data_file:
        for line in train_data_file:
            img_path = line.strip()
            train_data.append(img_path)
    
    # print(train_data)
      
    image_files = [file for file in os.listdir(dataset_path) if file.endswith(('.jpg', '.png'))]
    # print(image_files)
    
    matching_images = [image_name for image_name in train_data if image_name in image_files]
   
    if matching_images:
        print("The following images were used to train the model:")
        print(matching_images)
        
    else:
        print(f"No images in the {dataset_path} folder were used to train the model {os.path.basename(yolo_model_folder)}.")
    
    return matching_images

### Calculate IoU

In [None]:
def calculate_iou(box1, box2):
    
    """
    The 'calculate_iou' function is adapted from the 'bb_intersection_over_union' function from 
    https://pyimagesearch.com/2016/11/07/intersection-over-union-iou-for-object-detection/.
    The adaptation was necessary because 'bb_intersection_over_union' uses coordinates given in 
    x_min, y_min, x_max, y_max, whereas the coordinates from YOLOv8 for each prediction are relative 
    and given in x, y, w, h.
    
    Parameters:
    box1
    box2
    """
    
    # Convertir les coordonnées (x, y, w, h) en coordonnées (x_min, y_min, x_max, y_max)
    box1_x_min = box1[1] - box1[3] / 2
    box1_y_min = box1[2] - box1[4] / 2
    box1_x_max = box1[1] + box1[3] / 2
    box1_y_max = box1[2] + box1[4] / 2
    
    box2_x_min = box2[1] - box2[3] / 2
    box2_y_min = box2[2] - box2[4] / 2
    box2_x_max = box2[1] + box2[3] / 2
    box2_y_max = box2[2] + box2[4] / 2
    
    # Calculer les coordonnées (x,y) de l'intersection
    x_min = max(box1_x_min, box2_x_min)
    y_min = max(box1_y_min, box2_y_min)
    x_max = min(box1_x_max, box2_x_max)
    y_max = min(box1_y_max, box2_y_max)
    
    # Calculer l'aire de l'intersection
    intersection_area = max(0, x_max - x_min + 1) * max(0, y_max - y_min + 1)

    # Calculer l'aire des deux bounding boxes
    box1_area = (box1_x_max - box1_x_min + 1) * (box1_y_max - box1_y_min + 1)
    box2_area = (box2_x_max - box2_x_min + 1) * (box2_y_max - box2_y_min + 1)
    
    # Calculer l'Intersection over Union (IoU)
    iou = intersection_area / float(box1_area + box2_area - intersection_area)
    
    return iou

### Get the match boxes

In [None]:
def match_boxes(predictions, corrected_predictions, threshold):
    
    """
    The match_boxes function returns annotated bounding boxes and their predicted matches.
    This function also returns false positives (predicted objects for which there is no equivalent in the ground truth) and false negatives (objects present in the ground truth but not detected).
    
    **Warning**
    The 'predictions', 'corrected_predictions' parameters will be automatically filled later.

    The threshold parameter, i.e. the threshold below which the detected object will be considered as a false negative (if derived from ground truth) or as a false positive (if derived from prediction).
    By default, this threshold is implemented at 0.50, which corresponds to the recommendations of the PASCAL VOC challenge, or at 0.75 (strict detection):
    https://cocodataset.org/?ref=jeremyjordan.me#detection-eval.

    With a threshold of 0.1, localisation errors are ignored.
    """
    matched_predictions = []
    matched_corrections = []
    
    for prediction in predictions:
        prediction_box = prediction.split(" ")
        prediction_box = [float(coord) for coord in prediction_box]
        prediction_class = int(prediction_box[0])

        best_iou = 0
        best_prediction_idx = -1

        for i, correction in enumerate(corrected_predictions):
            correction_box = correction.split(" ")
            correction_box = [float(coord) for coord in correction_box]
            correction_class = int(correction_box[0])

            if  prediction_class == correction_class:
                iou = calculate_iou(prediction_box, correction_box)

                if iou > best_iou and iou >= threshold:
                    best_iou = iou
                    best_prediction_idx = i

        if best_prediction_idx != -1:
            matched_predictions.append(prediction)
            matched_corrections.append(corrected_predictions[best_prediction_idx])
    
    false_negatives = [p for p in corrected_predictions if p not in matched_corrections]
    false_positives = [a for a in predictions if a not in matched_predictions]


    return matched_predictions, matched_corrections, false_positives, false_negatives

### Load data from files

In [None]:
def load_data_from_files(file_paths):
    """
    **Warning**
    The 'file_paths' parameter will be automatically filled later.
    """
    data_list = []
    for file_path in file_paths:
        with open(file_path, 'r') as file:
            lines = file.readlines()
            for line in lines:
                data_list.append(line.strip())
    return data_list

### Generate the csv with the results

In [None]:
def get_csv_results(correction_from_ls_csv, dataset_path, yolo_model_folder, all_results):
    
    """
    This function generates correction results in CSV format. Each annotation is evaluated in TP, FP or FN.
    This file is used to calculate the various metrics.
    
    """
    
    results_folder = os.path.dirname(correction_from_ls_csv)
    labels = get_labels(os.path.join(yolo_model_folder, 'labels.txt'))
    
    # Get prediction files
    prediction_folder = results_folder.replace('results', 'labels')
    predictions_files = glob.glob(os.path.join(prediction_folder, '*.txt'))

    # Get corrected files
    correction_folder = results_folder.replace('results', 'correctedLabels')
    corrected_files = glob.glob(os.path.join(correction_folder, '*.txt'))
      
    if not all_results:
        output_file = os.path.join(results_folder, 'results_for_statistics.csv')   # Nom du fichier de sortie       
        img_use_for_training = get_img_from_training(yolo_model_folder, dataset_path)
        # print(img_use_for_training)

        # Exclude the images present in img_use_for_training from predictions_files
        predictions_files = [file for file in predictions_files if os.path.basename(file.replace('txt','jpg')) not in img_use_for_training]
        corrected_files = [file for file in corrected_files if os.path.basename(file).replace('.txt', '.jpg') not in img_use_for_training]
        # print(len(corrected_files))
        
    else:
        output_file = os.path.join(results_folder, 'results_for_evaluation.csv')

    rows = []

    for pred_file in predictions_files:
        matching_corr_files = [corr_file for corr_file in corrected_files if os.path.basename(corr_file) == os.path.basename(pred_file)]
        # print(matching_corr_files)
        
        if len(matching_corr_files) == 0:
            print(f'No matching file for {pred_file}')
            
        else:
            predictions = load_data_from_files([pred_file])
            corrected_predictions= load_data_from_files(matching_corr_files)

            matched_predictions, matched_corrections, false_positives, false_negatives = match_boxes(predictions, corrected_predictions, threshold=0.5)
            # print(matched_predictions)
            
            for prediction, correction in zip(matched_predictions, matched_corrections):
                iou = calculate_iou([float(coord) for coord in prediction.split(" ")], [float(coord) for coord in correction.split(" ")])
                box_coordinates = ' '.join(prediction.split(" "))

                rows.append({'Filename': os.path.basename(pred_file), 'Predicted_results': box_coordinates, 'TP/FP/FN': 'TP', 'classe': get_class_name(prediction[0], labels), 'Corrected_results': correction, 'IoU': iou})

            if len(false_positives) != 0:
                for false_positive in false_positives:
                    rows.append({'Filename': os.path.basename(pred_file), 'Predicted_results': false_positive, 'TP/FP/FN': 'FP', 'classe': get_class_name(false_positive[0], labels), 'Corrected_results': '', 'IoU': ''})

            if len(false_negatives) != 0:
                for false_negative in false_negatives:
                    rows.append({'Filename': os.path.basename(pred_file), 'Predicted_results': '', 'TP/FP/FN': 'FN', 'classe': get_class_name(false_negative[0], labels), 'Corrected_results': false_negative, 'IoU': ''})

    # Check for non-matching correction files
    for pred_file in predictions_files:
        pred_file_name = os.path.basename(pred_file)
        matching_corr_files = [corr_file for corr_file in corrected_files if os.path.basename(corr_file) == pred_file_name]

        if len(matching_corr_files) == 0:
            true_prediction = load_data_from_files([pred_file])
            
            for predicted_box in true_prediction:
                prediction_box = predicted_box.split(" ")
                prediction_box = [float(coord) for coord in prediction_box]
                
                prediction_class = int(prediction_box[0])
                
            classe = get_class_name(prediction_class, labels)
            box_coordinates = ' '.join(map(str, prediction_box))

            rows.append({'Filename': os.path.basename(pred_file), 'Predicted_results': box_coordinates, 'TP/FP/FN': 'TP', 'classe': classe, 'Corrected_results': 'No correction made', 'IoU': ''})

    # print(rows)
    df = pd.DataFrame(rows)

    df_sorted = df.sort_values('Filename')
    df_sorted.to_csv(output_file, index=False)

    print(f"The {output_file} file has been created.")

## Get metrics

In [None]:
def get_txt_results(correction_from_ls_csv):
    
    """
    Generates a txt file containing a summary of the data from the previously generated csv.
    With ONLY the data without the images used for annotation, for relevant results.
    """
    
    results_folder = os.path.dirname(correction_from_ls_csv)
    csv_with_results = os.path.join(results_folder, 'results_for_statistics.csv')

    df = pd.read_csv(csv_with_results)
    output_file = csv_with_results.replace('.csv', '.txt')
    
    print(output_file)
    
    # Collect all unique classes present in the DataFrame
    all_classes = df['classe'].unique()

    # Initialize TP, FP and FN counters for all classes
    class_TP = {classe: 0 for classe in all_classes}
    class_FP = {classe: 0 for classe in all_classes}
    class_FN = {classe: 0 for classe in all_classes}

    # Browse DataFrame rows
    for _, row in df.iterrows():
        classe = row['classe']
        # Check TP/FP/FN status for line
        if row['TP/FP/FN'] == 'TP':
            class_TP[classe] += 1
        elif row['TP/FP/FN'] == 'FP':
            class_FP[classe] += 1
        elif row['TP/FP/FN'] == 'FN':
            class_FN[classe] += 1

    # Calculate global totals
    total_TP = sum(class_TP.values())
    total_FP = sum(class_FP.values())
    total_FN = sum(class_FN.values())

    # Recall computation
    recall = total_TP / (total_TP + total_FN) if (total_TP + total_FN) != 0 else 0

    # Precision computation
    precision = total_TP / (total_TP + total_FP) if(total_TP + total_FP) != 0 else 0

    # Calculation of the overall F1 score
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    # Open the file in write mode
    with open(output_file, 'w') as file:
        # Écrire les résultats globaux
        file.write("Overall results :\n")
        file.write("Number of TP: {}\n".format(total_TP))
        file.write("Number of FP : {}\n".format(total_FP))
        file.write("Number of FN: {}\n".format(total_FN))
        file.write("\nRecall (Recall) : {}\n".format(recall))
        file.write("Precision : {}\n".format(precision))
        file.write("Score F1 global : {}\n".format(f1_score))
        file.write("\n")

        # Write results by class
        file.write("Results per class :\n")
        for classe in all_classes:
            tp = class_TP[classe]
            fp = class_FP[classe]
            fn = class_FN[classe]

            recall_class = tp / (tp + fn) if (tp + fn) != 0 else 0
            precision_class = tp / (tp + fp) if (tp + fp) != 0 else 0
            f1_score_class = 2 * (precision_class * recall_class) / (precision_class + recall_class) if (precision_class + recall_class) != 0 else 0

            file.write("Class {}\n".format(classe))
            file.write("Number of TP: {}\n".format(tp))
            file.write("Number of FP : {}\n".format(fp))
            file.write("Number of FN: {}\n".format(fn))
            file.write("Recall (Recall): {}\n".format(recall_class))
            file.write("Precision : {}\n".format(precision_class))
            file.write("Score F1 : {}\n".format(f1_score_class))
            file.write("\n")

    print(f"The {output_file} file has been created.")

## Process

In [None]:
dataset_path = 'ABSPATHTOTHEFOLDER' # to be changed, asbolute path to a folder with images only, without annotations.
yolo_model_folder = 'ABSPATHTOTHEMODELFOLDER' # to be changed, asbolute path to the folder with the training data

# The CSV file from LS must be load in the results folder, i.e the one you create in the previous notebook
correction_from_ls_csv = 'ABSPATHTOTHECSVWITHCORRECTIONS'

## Generate results from Label Studio with URL

### Generate the corrected files in YOLO format

In [None]:
generate_corrected_files(correction_from_ls_csv, dataset_path, yolo_model_folder)

### Generate a CSV with the corrected data

In [None]:
get_csv_results(correction_from_ls_csv, dataset_path, yolo_model_folder, all_results=False)

### Generate the file with metrics

In [None]:
get_txt_results(correction_from_ls_csv)