# Dataset Preparation


## Frames Extraction


In [3]:
import cv2

# Print the frame rate (FPS) of the video
def print_fps(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}")
        return
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0:
        print("Error: Could not retrieve FPS information")
    else:
        print(f"Frames per second using video.get(cv2.CAP_PROP_FPS): {fps}")
    cap.release()

if __name__ == "__main__":
    video_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/test_indoor1.avi"
    print_fps(video_path)

Frames per second using video.get(cv2.CAP_PROP_FPS): 30.0


In [None]:
import cv2
import os


# Function to extract frames from video
def extract_frames(video_path, output_folder):
    # Initialize the video capture object
    cap = cv2.VideoCapture(video_path)

    count = 0
    success = True

    while success:
        # Read each new frame
        success, img = cap.read()

        # Check if the read was unsuccessful
        if not success:
            print("End of video reached.")
            break

        # Save the current frame as a JPEG image
        output_file = os.path.join(output_folder, f"frame_{count}.jpg")
        cv2.imwrite(output_file, img)

        count += 1

    # Release the video capture object and close all windows
    cap.release()
    cv2.destroyAllWindows()

In [None]:
# Replace 'path_to_video' with your actual path to video file
# input_folder_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/data/official_dataset/test"
# output_folder_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames"
# for video in os.listdir(input_folder_path):
#     video_path = os.path.join(input_folder_path, video)
#     file_name = video.split(".")[0]
#     output_path = os.path.join(output_folder_path, file_name)
#     os.makedirs(output_path, exist_ok=True)
#     extract_frames(video_path, output_path)
#     print(f"Frames extracted from {video_path}")

## Label Studio Conversion

In [None]:
import json 

def convert_json(input_file, output_file):
    with open(input_file, "r") as f:
        data = json.load(f)

    output_data = []

    for item in data:
        frame = item["file_upload"].split("-")[-1].split(".")[0]
        if item["annotations"]:
            first_annotation = item["annotations"][0]
            if first_annotation["result"]:
                first_result = first_annotation["result"][0]
                bbox = first_result["value"]

                # Convert percentages to pixel values based on original image width/height
                x = bbox["x"] * first_result["original_width"] / 100.0
                y = bbox["y"] * first_result["original_height"] / 100.0
                width = bbox["width"] * first_result["original_width"] / 100.0
                height = bbox["height"] * first_result["original_height"] / 100.0

                bbox_xywh = [x, y, width, height]

                output_entry = {
                    "frame": frame,
                    "bbox": bbox_xywh
                }

                output_data.append(output_entry)

    with open(output_file, "w") as f:
        json.dump(output_data, f, indent=4)

In [None]:
# Specify your input and output file paths
# input_file = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/code/LSTM/data/outdoor2/test_outdoor2.json"
# output_file = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/code/LSTM/data/outdoor2/outdoor2_full_dataset.json"

# convert_json(input_file, output_file)

### Function to create training, validation & test datasets with a classic JSON file

In [None]:
import json
from sklearn.model_selection import train_test_split


def create_datasets(
    annotation_file, train_ratio=0.64, val_ratio=0.16, test_ratio=0.2, output_dir="."
):
    """
    Creates train.json, validation.json and test.json files from annotations.json.

    Parameters :
        annotation_file (str): Path to the annotations.json file
        train_ratio (int): Ratio of data allocated to training
        val_ratio (int): Ratio of data assigned to validation
        test_ratio (int) : Ratio of data allocated to tests
        output_dir (str): Directory where the resultant files will be saved
    """
    # Loads the annotations json file
    with open(annotation_file, "r") as f:
        annotations = json.load(f)

    # Checks that the division ratios are correct
    assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios should slumber at 1.0"

    num_annotations = len(annotations)

    # Calculer les indices de division
    train_end = int(train_ratio * num_annotations)
    val_end = int((train_ratio + val_ratio) * num_annotations)

    # Découper les annotations en chaînes continues pour l'entraînement, la validation et les tests
    train_annotations = annotations[:train_end]
    val_annotations = annotations[train_end:val_end]
    test_annotations = annotations[val_end:]

    # Saves JSON files for training, validation and testing
    with open(f"{output_dir}/train.json", "w") as f:
        json.dump(train_annotations, f, indent=4)

    with open(f"{output_dir}/validation.json", "w") as f:
        json.dump(val_annotations, f, indent=4)

    with open(f"{output_dir}/test.json", "w") as f:
        json.dump(test_annotations, f, indent=4)

    print(
        "‘The files train.json, validation.json and test.json have been created and saved."
    )

In [None]:

# create_datasets(
#     "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/code/LSTM/data/outdoor1/outdoor1_full_dataset.json",
#     output_dir="/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/code/LSTM/data/outdoor1",
# )

### Function to create training, validation & test datasets with a YOLO format dataset folder

In [1]:
import os
import random
import shutil
from pathlib import Path
from sklearn.model_selection import train_test_split


def create_splits_and_yaml(dataset_path, train_size=0.7, val_size=0.1, test_size=0.2):
    dataset_path = Path(dataset_path)

    assert (
        images_path.exists() and labels_path.exists()
    ), "Invalid dataset path, or missing 'images' and 'labels' folders."

    # Get all image files
    images = list(images_path.glob("*.jpg")) + list(
        images_path.glob("*.png")
    )  # depending on your dataset format

    # Split the dataset
    train_images, test_images = train_test_split(
        images, test_size=test_size, random_state=42
    )
    train_images, val_images = train_test_split(
        train_images, test_size=val_size / (train_size + val_size), random_state=42
    )

    def move_files(file_list, dest_images_folder, dest_labels_folder):
        for image in file_list:
            label_file = labels_path / (image.stem + ".txt")
            if label_file.exists():
                shutil.copy(label_file, dest_labels_folder / label_file.name)
            shutil.copy(image, dest_images_folder / image.name)

    split_folders = ["train", "val", "test"]
    for folder in split_folders:
        dest_images_folder = dataset_path / folder / "images"
        dest_labels_folder = dataset_path / folder / "labels"
        (dataset_path / folder).mkdir(parents=True, exist_ok=True)
        dest_images_folder.mkdir(exist_ok=True)
        dest_labels_folder.mkdir(exist_ok=True)

    move_files(
        train_images,
        dataset_path / "train" / "images",
        dataset_path / "train" / "labels",
    )
    move_files(
        val_images, dataset_path / "val" / "images", dataset_path / "val" / "labels"
    )
    move_files(
        test_images, dataset_path / "test" / "images", dataset_path / "test" / "labels"
    )

    # Create the .yaml file
    yaml_content = f"""
    # class names
    names:
        0: fuel port

    # number of classes
    nc: 1

    train: {str(dataset_path / 'train')}
    val: {str(dataset_path / 'val')}
    test: {str(dataset_path / 'test')}
    """

    with open(dataset_path / "dataset.yaml", "w") as yaml_file:
        yaml_file.write(yaml_content.strip())

In [2]:
# Example usage:
dataset_folder = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO"  # replace with your actual dataset path

training_videos = [
    "video_1",
    "video_2",
    "video_3",
    "video_4",
    "video_5",
    "video_6",
    "video_7",
    "video_8",
    "video_9",
    "video_10",
]

validation_videos = ["video_11", "video_12", "video_13"]


test_videos = [
    "video_14",
    "video_15",
    "video_16",
    "video_17",
    "video_18",
    "video_19",
    "video_20",
]


create_splits_and_yaml(dataset_folder)

### Converter for the provided dataset to YOLO format

In [None]:
import os
import json
from glob import glob
import shutil

def convert_annotation(json_file, output_dir_labels):
    with open(json_file, 'r') as f:
        data = json.load(f)

    txt_output_path = os.path.join(output_dir_labels, os.path.splitext(os.path.basename(json_file))[0] + '.txt')

    with open(txt_output_path, 'w') as txt_out:
        for shape in data['shapes']:
            if shape['shape_type'] != 'rectangle':
                continue

            label = shape['label']
            points = shape['points']
            x1, y1 = points[0]
            x2, y2 = points[1]

            # Convert to YOLO format
            width = data['imageWidth']
            height = data['imageHeight']
            xc = (x1 + x2) / 2 / width
            yc = (y1 + y2) / 2 / height
            w = (x2 - x1) / width
            h = (y2 - y1) / height

            class_id = 0  # Update this if you have multiple classes and a class mapping system
            txt_out.write(f"{class_id} {xc} {yc} {w} {h}\n")

def convert_annotations_in_directory(input_dir, output_dir_images, output_dir_labels):
    if not os.path.exists(output_dir_images):
        os.makedirs(output_dir_images)
    if not os.path.exists(output_dir_labels):
        os.makedirs(output_dir_labels)

    json_files = glob(os.path.join(input_dir, '*.json'))

    for json_file in json_files:
        convert_annotation(json_file, output_dir_labels)

    # Copy image files to the output images directory
    image_files = glob(os.path.join(input_dir, '*.jpg'))
    for image_file in image_files:
        dest_file = os.path.join(output_dir_images, os.path.basename(image_file))
        if not os.path.exists(dest_file):
            shutil.copy(image_file, dest_file)

In [None]:
# if __name__ == '__main__':
#     input_directory = '/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/data/official_dataset_annotated/experiments/video_lab_platform_8'  # Replace with the path to your input directory
#     output_directory_images = input_directory + "_yolo/images"  # Replace with the path to your output images directory
#     output_directory_labels = input_directory + "_yolo/labels"  # Replace with the path to your output labels directory

#     convert_annotations_in_directory(input_directory, output_directory_images, output_directory_labels)

In [None]:
import os
import shutil

# Set the path to the "experiments" folder
experiments_folder = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/data/official_dataset_annotated/experiments/yolo"

# Set the path to the merged dataset folder
merged_folder = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/data/official_dataset_annotated/experiments/yolo_merged_dataset"
merged_images_folder = os.path.join(merged_folder, "images")
merged_labels_folder = os.path.join(merged_folder, "labels")

# Create the merged dataset folder if it doesn't exist
os.makedirs(merged_images_folder, exist_ok=True)
os.makedirs(merged_labels_folder, exist_ok=True)

# Iterate over each subfolder in the "experiments" folder
for subfolder in os.listdir(experiments_folder):
    subfolder_path = os.path.join(experiments_folder, subfolder)

    # Check if the subfolder is a directory
    if os.path.isdir(subfolder_path):
        images_folder = os.path.join(subfolder_path, "images")
        labels_folder = os.path.join(subfolder_path, "labels")

        # Iterate over each file in the "images" folder
        for filename in os.listdir(images_folder):
            old_image_path = os.path.join(images_folder, filename)
            new_image_filename = f"{subfolder}_{filename}"
            new_image_path = os.path.join(merged_images_folder, new_image_filename)

            # Copy the image file to the merged dataset folder with the new filename
            shutil.copy2(old_image_path, new_image_path)

        # Iterate over each file in the "labels" folder
        for filename in os.listdir(labels_folder):
            old_label_path = os.path.join(labels_folder, filename)
            new_label_filename = f"{subfolder}_{filename}"
            new_label_path = os.path.join(merged_labels_folder, new_label_filename)

            # Copy the label file to the merged dataset folder with the new filename
            shutil.copy2(old_label_path, new_label_path)

print("Merging complete!")

# YOLO Fine Tuning - New Dataset Preparation Method

In [14]:
import csv
import os
import shutil


def split_YOLO_Dataset_from_CSV_exported_file(
    export_csv_file_path, yolo_dataset_folder_path, output_folder_path
):
    video_id = 1
    frames_per_video = 0

    with open(csv_file, "r") as file:
        reader = csv.DictReader(file)
        for row in reader:
            try:
                # Extract the frame ID and image file name
                frame_id = int(row["image"].split("_")[-1].split(".")[0])
                image_file = row["image"].split("/")[-1]  # Extract the image file name
                image_label_file = image_file.replace(
                    ".jpg", ".txt"
                )  # Replace the image file extension with .txt for the label file

                # Retrieve the image and label file paths
                from_image_file_path = os.path.join(
                    yolo_dataset_folder_path, "images", image_file
                )  # Retrieve the image file path
                from_image_label_file_path = os.path.join(
                    yolo_dataset_folder_path, "labels", image_label_file
                )  # Retrieve the label file path

                # Create a folder if it doesn't exist (1 folder per video)
                to_video_folder_path = os.path.join(
                    output_folder_path, f"video_{video_id}"
                )
                os.makedirs(
                    to_video_folder_path, exist_ok=True
                )  # Create a folder if it doesn't exist

                # Create an "images" folder if it doesn't exist
                to_images_folder_path = os.path.join(to_video_folder_path, "images")
                os.makedirs(
                    to_images_folder_path, exist_ok=True
                )  # Create an "images" folder if it doesn't exist

                # Create a "labels" folder if it doesn't exist
                to_labels_folder_path = os.path.join(to_video_folder_path, "labels")
                os.makedirs(
                    to_labels_folder_path, exist_ok=True
                )  # Create a "labels" folder if it doesn't exist

                # Copy the image file to the "images" folder
                shutil.copy2(
                    from_image_file_path,
                    os.path.join(to_images_folder_path, image_file),
                )

                # Copy the label file to the "labels" folder
                shutil.copy2(
                    from_image_label_file_path,
                    os.path.join(to_labels_folder_path, image_label_file),
                )

                # Increment the video ID
                if frames_per_video and frame_id == 0:
                    video_id += 1
                    frames_per_video = 0

                # Increment the frame count for the current video
                frames_per_video += 1

            except Exception as e:
                print(f"Error processing row: {row}")
                print(e)

In [15]:
export_csv_file_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_fpp/raw_data.csv"
yolo_dataset_folder_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO"
output_folder_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO"
split_YOLO_Dataset_from_CSV_exported_file(
    export_csv_file_path, yolo_dataset_folder_path, output_folder_path
)

### Convert Label Studio CSV Export file to full training json file

In [1]:
import csv
import json

def verify_value_is_between_0_and_1(value):
    return max(0, min(1, value))

def convert_percentage_to_normalized(x_pct, y_pct, width_pct, height_pct, original_width, original_height):
    # Convert percentage to normalized
    #x1_norm = verify_value_is_between_0_and_1(x_pct / 100.0)
    #y1_norm = verify_value_is_between_0_and_1(y_pct / 100.0)
    #x2_norm = verify_value_is_between_0_and_1((x_pct + width_pct) / 100.0)
    #y2_norm = verify_value_is_between_0_and_1((y_pct + height_pct) / 100.0)
    #return x1_norm, y1_norm, x2_norm, y2_norm
    x = x_pct / 100.0
    y = y_pct / 100.0
    width = width_pct / 100.0
    height = height_pct / 100.0
    return x, y, width, height

def convert_csv_to_json(csv_file):
    data = []
    video_id = 1
    frames = []

    with open(csv_file, "r") as file:
        reader = csv.DictReader(file)
        for row in reader:
            try:
                annotation = json.loads(row["label"]) if row["label"] else []
                frame_id = int(row["image"].split("_")[-1].split(".")[0])
                image_name = row["image"].split("/")[-1]

                if frame_id == 0 and frames:
                    data.append({"video_id": video_id, "frames": frames})
                    video_id += 1
                    frames = []

                if annotation:
                    original_width = annotation[0]["original_width"]
                    original_height = annotation[0]["original_height"]

                    value = annotation[0]

                    x_pct = value["x"] 
                    y_pct = value["y"]
                    width_pct = value["width"]
                    height_pct = value["height"]

                    # Convert to normalized xyxy
                    x1_norm, y1_norm, x2_norm, y2_norm = convert_percentage_to_normalized(
                        x_pct, y_pct, width_pct, height_pct, original_width, original_height
                    )

                    bbox_xyxy_norm = [x1_norm, y1_norm, x2_norm, y2_norm]

                    output_entry = {
                        "image_name": image_name,
                        "frame": frame_id,
                        "bbox": bbox_xyxy_norm
                    }
                    frames.append(output_entry)

            except json.JSONDecodeError:
                print(f"Skipping row due to JSON decode error: {row}")
                continue

    if frames:
        data.append({"video_id": video_id, "frames": frames})

    return data

In [2]:
# Example usage
csv_file = '/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_fpp/raw_data.csv'
json_data = convert_csv_to_json(csv_file)

with open('/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_fpp/full_dataset_formatted.json', 'w') as file:
    json.dump(json_data, file, indent=4)

### Prepare training dataset for the future position prediction model

In [3]:
import json

def split_video(video, train_ratio, val_ratio, test_ratio):
    num_frames = len(video['frames'])

    train_end = int(train_ratio * num_frames)
    val_end = int((train_ratio + val_ratio) * num_frames)

    train_frames = video['frames'][:train_end]
    val_frames = video['frames'][train_end:val_end]
    test_frames = video['frames'][val_end:]

    return (
        {'video_id': video['video_id'], 'frames': train_frames},
        {'video_id': video['video_id'], 'frames': val_frames},
        {'video_id': video['video_id'], 'frames': test_frames}
    )

def create_datasets(annotation_file, train_ratio=0.64, val_ratio=0.16, test_ratio=0.2, output_dir="."):
    """
    Crée les fichiers train.json, validation.json et test.json à partir d'un fichier annotations.json.

    Paramètres :
        annotation_file (str): Chemin vers le fichier annotations.json
        train_ratio (float): Ratio des données allouées à l'entraînement
        val_ratio (float): Ratio des données allouées à la validation
        test_ratio (float): Ratio des données allouées aux tests
        output_dir (str): Répertoire où les fichiers résultants seront enregistrés
    """
    # Chargement du fichier JSON des annotations
    with open(annotation_file, "r") as f:
        annotations = json.load(f)

    # Vérification que la somme des ratios est correcte
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, "Les ratios doivent correspondre à 1.0"

    train_annotations = []
    val_annotations = []
    test_annotations = []

    # Diviser les vidéos séparément
    for video in annotations:
        train, val, test = split_video(video, train_ratio, val_ratio, test_ratio)
        train_annotations.append(train)
        val_annotations.append(val)
        test_annotations.append(test)

    # Enregistrement des fichiers JSON pour l'entraînement, la validation et les tests
    with open(f"{output_dir}/train.json", "w") as f:
        json.dump(train_annotations, f, indent=4)

    with open(f"{output_dir}/validation.json", "w") as f:
        json.dump(val_annotations, f, indent=4)

    with open(f"{output_dir}/test.json", "w") as f:
        json.dump(test_annotations, f, indent=4)

    print("Les fichiers train.json, validation.json et test.json ont été créés et enregistrés.")

In [4]:
#Exemple d'utilisation
create_datasets(
    "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_fpp/full_dataset_formatted.json",
    output_dir="/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_fpp",
)

Les fichiers train.json, validation.json et test.json ont été créés et enregistrés.


## Check and update labels values

In [26]:
import os

def verify_and_replace_labels_in_directory(main_folder, new_class_id):
    """
    Vérifie qu'il y a un seul label par image et remplace le numéro de classe pour toutes les vidéos dans le dossier principal.
    
    Args:
    - main_folder (str): Chemin vers le dossier principal contenant les sous-dossiers de vidéos.
    - new_class_id (int): Le nouveau numéro de classe à utiliser.
    """
    # Parcours des sous-dossiers (vidéos)
    for video_folder in os.listdir(main_folder):
        video_path = os.path.join(main_folder, video_folder)
        if os.path.isdir(video_path):
            image_folder = os.path.join(video_path, 'images')
            label_folder = os.path.join(video_path, 'labels')

            if os.path.exists(image_folder) and os.path.exists(label_folder):
                image_files = [f for f in os.listdir(image_folder) if f.endswith('.jpg') or f.endswith('.jpeg') or f.endswith('.png')]

                for image_file in image_files:
                    label_file = os.path.join(label_folder, image_file.rsplit('.', 1)[0] + '.txt')

                    if os.path.exists(label_file):
                        with open(label_file, 'r') as file:
                            lines = file.readlines()

                        if len(lines) == 1:
                            parts = lines[0].strip().split()
                            if len(parts) > 1:
                                # Remplacer le numéro de classe par new_class_id
                                parts[0] = str(new_class_id)
                                new_line = ' '.join(parts) + '\n'

                                with open(label_file, 'w') as file:
                                    file.write(new_line)
                            else:
                                print(f"Erreur: Format incorrect dans {label_file}")
                        elif len(lines) == 0:
                            pass  # Pas de label pour cette image
                        else:
                            parts = lines[0].strip().split()
                            if len(parts) > 1:
                                # Remplacer le numéro de classe par new_class_id pour le premier label
                                parts[0] = str(new_class_id)
                                new_line = ' '.join(parts) + '\n'

                                # Écrire uniquement le premier label
                                with open(label_file, 'w') as file:
                                    file.write(new_line)
                            else:
                                print(f"Erreur: Format incorrect dans {label_file}")

                    else:
                        print(f"Erreur: Label manquant pour {image_file} dans {video_folder}")
            else:
                print(f"Erreur: Dossiers 'images' ou 'labels' manquants dans {video_folder}")

# Exemple d'utilisation
main_folder_path = '/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/open/val'  # Remplacez par le chemin vers le dossier principal (train, val, ou test)
nouveau_numero_classe = 2  # Numéro de classe que vous souhaitez utiliser

verify_and_replace_labels_in_directory(main_folder_path, nouveau_numero_classe)

### Function to join all the sub-datasets into a single dataset

In [14]:
import os
import shutil


def join_subfolders(folder_path):
    for subfolder in os.listdir(folder_path):  # test, val, train
        subfolder_path = os.path.join(folder_path, subfolder)
        if os.path.isdir(subfolder_path):
            # Create "all" folder if it doesn't exist
            all_folder_path = os.path.join(subfolder_path, "all")
            all_images_folder = os.path.join(all_folder_path, "images")
            all_labels_folder = os.path.join(all_folder_path, "labels")
            os.makedirs(all_folder_path, exist_ok=True)
            os.makedirs(all_images_folder, exist_ok=True)
            os.makedirs(all_labels_folder, exist_ok=True)

            # Iterate over subsubfolders in subfolder (video folders)
            for subsubfolder in os.listdir(subfolder_path):
                if subsubfolder == "all":
                    continue
                subsubfolder_path = os.path.join(subfolder_path, subsubfolder)
                if os.path.isdir(subsubfolder_path):
                    #  Copy image files to the "images" folder
                    image_files = os.listdir(os.path.join(subsubfolder_path, "images"))
                    for image_file in image_files:
                        shutil.copy2(
                            os.path.join(subsubfolder_path, "images", image_file),
                            os.path.join(all_images_folder, image_file),
                        )

                    # Copy label files to the "labels" folder
                    label_files = os.listdir(os.path.join(subsubfolder_path, "labels"))
                    for label_file in label_files:
                        shutil.copy2(
                            os.path.join(subsubfolder_path, "labels", label_file),
                            os.path.join(all_labels_folder, label_file),
                        )

    print("Subfolders joined successfully!")


# Example usage
folder_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/semiopen"
join_subfolders(folder_path)

Subfolders joined successfully!


### Function to create the final YOLO Training set

In [2]:
import os
import shutil
import random
import math


def collect_images(source_folder, split):
    all_images = []
    split_path = os.path.join(source_folder, split)
    all_folder_path = os.path.join(split_path, "all")

    if os.path.isdir(all_folder_path):
        images_path = os.path.join(all_folder_path, "images")
        if os.path.isdir(images_path):
            all_images.extend(
                [
                    os.path.join(images_path, f)
                    for f in os.listdir(images_path)
                    if f.endswith((".jpg", ".jpeg", ".png"))
                ]
            )
    return all_images


def split_and_balance_dataset(
    source_folders, dest_folder, train_ratio, val_ratio, test_ratio
):
    # Créer le dossier de destination s'il n'existe pas
    os.makedirs(dest_folder, exist_ok=True)
    # Create subfolders for train, val, and test
    for split in ["train", "val", "test"]:
        os.makedirs(os.path.join(dest_folder, split), exist_ok=True)
        for subfolder in ["images", "labels"]:
            os.makedirs(os.path.join(dest_folder, split, subfolder), exist_ok=True)

    # Collecter les images de chaque classe
    dataset = {}
    for key, source_folder in source_folders.items():
        dataset[key] = {
            "train": collect_images(source_folder, "train"),
            "val": collect_images(source_folder, "val"),
            "test": collect_images(source_folder, "test"),
        }

    # Compute the min number of images for each class in each set (train, val, test)
    # It should be a dictionary with the split as key and the min number of images as values
    min_images = {}
    for split in ["train", "val", "test"]:
        min_images[split] = min(len(dataset[key][split]) for key in dataset.keys())

    # Keep only the min number of images for each class in each set
    for key in dataset.keys():
        for split in ["train", "val", "test"]:
            random.shuffle(dataset[key][split])  # Shuffle the images
            dataset[key][split] = dataset[key][split][
                : min_images[split]
            ]  # Keep only the min number of images

    # Create train, val, and test sets that concatenate the images from all classes
    train_set = []
    val_set = []
    test_set = []
    for key in dataset.keys():
        train_set.extend(dataset[key]["train"])
        val_set.extend(dataset[key]["val"])
        test_set.extend(dataset[key]["test"])

    # Shuffle the sets
    random.shuffle(train_set)
    random.shuffle(val_set)
    random.shuffle(test_set)

    # total number of images
    total_images = len(train_set) + len(val_set) + len(test_set)

    # Training set Number of images
    val_num_images = math.floor(total_images * val_ratio)
    test_num_images = math.floor(total_images * test_ratio)
    train_num_images = total_images - val_num_images - test_num_images
    if train_num_images > len(train_set):
        train_num_images = len(train_set)

    # Adjust the number of images in test and val sets to match the ratios
    total = val_num_images + test_num_images + train_num_images
    if ((val_num_images + test_num_images) / total) > (val_ratio + test_ratio):
        while ((val_num_images + test_num_images) / total) > (val_ratio + test_ratio):
            test_num_images -= 1
            val_num_images -= 1

    # Balance the sets
    train_set = train_set[:train_num_images]
    val_set = val_set[:val_num_images]
    test_set = test_set[:test_num_images]

    # Copy the files to the destination in YOLO format
    for split in ["train", "val", "test"]:
        dataset = (
            train_set if split == "train" else val_set if split == "val" else test_set
        )

        # Copy the images
        images_folder = os.path.join(dest_folder, split, "images")
        labels_folder = os.path.join(dest_folder, split, "labels")

        with open(os.path.join(dest_folder, f"{split}.txt"), "w") as f:
            for image_path in dataset:
                image_name = os.path.basename(image_path)
                label_name = os.path.splitext(image_name)[0] + ".txt"
                label_path = os.path.join(
                    os.path.dirname(os.path.dirname(image_path)), "labels", label_name
                )

                # Copy the image
                shutil.copy(image_path, os.path.join(images_folder, image_name))
                # Write the relative path in the list file
                f.write(f"{split}/images/{image_name}\n")

                # Copy the label if it exists
                if os.path.exists(label_path):
                    shutil.copy(label_path, os.path.join(labels_folder, label_name))
                else:
                    print(
                        f"Warning: Label missing for {image_name} in {os.path.dirname(image_path)}"
                    )

    # Créer le fichier data.yaml
    with open(os.path.join(dest_folder, "data.yaml"), "w") as f:
        f.write(f"train: {os.path.join(dest_folder, 'train.txt')}\n")
        f.write(f"val: {os.path.join(dest_folder, 'val.txt')}\n")
        f.write(f"test: {os.path.join(dest_folder, 'test.txt')}\n")
        f.write(f"nc: {len(source_folders)}\n")
        f.write(f"names: {list(source_folders.keys())}\n")

    # Imprimer les statistiques
    total_images = len(train_set) + len(val_set) + len(test_set)
    print("Nombre d'images par classe dans chaque ensemble :")
    print(
        f"Training set: Number of images: {len(train_set)} and Ratio: {len(train_set) / total_images:.2f}"
    )
    print(
        f"Validation set: Number of images: {len(val_set)} and Ratio: {len(val_set) / total_images:.2f}"
    )
    print(
        f"Test set: Number of images: {len(test_set)} and Ratio: {len(test_set) / total_images:.2f}"
    )


# Exemple d'utilisation
source_folders = {
    "Close": "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/close",
    "Open": "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/open",
    "Semi-Open": "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/semiopen",
}
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

destination_folder_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/balanced_dataset"


split_and_balance_dataset(
    source_folders, destination_folder_path, train_ratio, val_ratio, test_ratio
)

Nombre d'images par classe dans chaque ensemble :
Training set: Number of images: 3534 and Ratio: 0.70
Validation set: Number of images: 773 and Ratio: 0.15
Test set: Number of images: 773 and Ratio: 0.15


### Function to create the final FPP Training set

In [6]:
import os
import shutil
import random
import json


def collect_data(source_folder, split):
    all_data = []
    split_path = os.path.join(source_folder, split)

    for video_folder in os.listdir(split_path):
        images_path = os.path.join(split_path, video_folder, "images")
        if os.path.isdir(images_path):
            frames = []
            for image_file in os.listdir(images_path):
                if image_file.endswith((".jpg", ".jpeg", ".png")):
                    image_path = os.path.join(images_path, image_file)
                    image_name = os.path.basename(image_path)

                    label_name = os.path.splitext(image_name)[0] + ".txt"
                    label_path = os.path.join(
                        os.path.dirname(os.path.dirname(image_path)),
                        "labels",
                        label_name,
                    )

                    frame_id = int(image_name.split("_")[-1].split(".")[0])

                    class_id, x, y, w, h = None, None, None, None, None
                    if os.path.exists(label_path):
                        with open(label_path, "r") as f:
                            line = f.readline().strip().split()
                            if len(line) == 5:
                                class_id, x, y, w, h = line
                                class_id = int(class_id)
                                x, y, w, h = float(x), float(y), float(w), float(h)

                    frames.append(
                        {
                            "frame_id": frame_id,
                            "image_name": image_name,
                            "class_id": class_id,
                            "bbox": [x, y, w, h],
                        }
                    )

            frames = sorted(frames, key=lambda x: x["frame_id"])
            all_data.append({"video_id": video_folder, "frames": frames})

    return all_data


def prepare_datasets(
    source_folders, dest_folder
):
    dataset = {key: {} for key in source_folders.keys()}

    for key, folder in source_folders.items():
        for split in ["train", "val", "test"]:
            dataset[key][split] = collect_data(folder, split)

    train_data = []
    val_data = []
    test_data = []

    for key in dataset.keys():
        for video in dataset[key]["train"]:
            train_data.append(video)
        for video in dataset[key]["val"]:
            val_data.append(video)
        for video in dataset[key]["test"]:
            test_data.append(video)

    with open(os.path.join(dest_folder, "train.json"), "w") as f:
        json.dump(train_data, f, indent=4)

    with open(os.path.join(dest_folder, "val.json"), "w") as f:
        json.dump(val_data, f, indent=4)

    with open(os.path.join(dest_folder, "test.json"), "w") as f:
        json.dump(test_data, f, indent=4)

    
    


# Exemple d'utilisation
source_folders = {
    "Close": "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/close",
    "Open": "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/open",
    "Semi-Open": "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_YOLO/semiopen",
}

destination_folder_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_fpp"

prepare_datasets(
    source_folders, destination_folder_path
)

# Trajectory Smoothing

In [14]:
import json
import pandas as pd
import numpy as np
from scipy.signal import savgol_filter


def smooth_trajectory(trajectory, window_length=5, polyorder=2):
    if len(trajectory) < window_length:
        return trajectory  # Return original if not enough data points
    return savgol_filter(trajectory, window_length, polyorder, axis=0)


def handle_null_values(trajectory):
    # Convert to DataFrame for easier manipulation
    df = pd.DataFrame(trajectory, columns=['x_center', 'y_center', 'width', 'height'])
    # Interpolate missing values
    df.interpolate(method='linear', limit_direction='both', inplace=True)
    return df.values


# Load the data
file_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_fpp/val.json"
with open(file_path) as f:
    data = json.load(f)

# Extract the bounding box coordinates and apply smoothing
for entry in data:
    bbox_trajectory = []
    for frame in entry["frames"]:
        bbox = frame.get("bbox", [])
        bbox_trajectory.append(bbox)

    # Handle null values by interpolation
    bbox_trajectory = handle_null_values(bbox_trajectory)

    # Convert to numpy array for processing
    bbox_trajectory_np = np.array(bbox_trajectory)

    # Apply temporal smoothing to bbox trajectory
    smoothed_trajectory = smooth_trajectory(bbox_trajectory_np)

    # Update the bbox coordinates in the data
    for idx, frame in enumerate(entry["frames"]):
        frame["bbox"] = smoothed_trajectory[idx].tolist()

# Save the updated data back to a JSON file
output_file_path = "/Users/alexis/Library/CloudStorage/OneDrive-Balayre&Co/Cranfield/Thesis/thesis-github-repository/data/frames/full_dataset_annotated_fpp/val_filter_savgol.json"
with open(output_file_path, "w") as f:
    json.dump(data, f, indent=4)