## Extract, Transform and Load (ETL) Step

We are going to use the YOLO algorithm for object detection. Let's transform the dataset into the format expected by YOLO

https://docs.ultralytics.com/datasets/detect/#ultralytics-yolo-format

### Setup

In [46]:
import os
import random
import time
from dataclasses import dataclass
from pathlib import Path
from enum import Enum
import shutil


ROOT_DIR = "datasets\\aircraft"
LABELS_DIR = "labels"
IMAGES_DIR = "images"
TRAIN_DIR_LABELS = os.path.join(ROOT_DIR, LABELS_DIR, "train")
VAL_DIR_LABELS = os.path.join(ROOT_DIR, LABELS_DIR, "val")
TEST_DIR_LABELS = os.path.join(ROOT_DIR, LABELS_DIR, "test")
TRAIN_DIR_IMAGES = os.path.join(ROOT_DIR, IMAGES_DIR, "train")
VAL_DIR_IMAGES = os.path.join(ROOT_DIR, IMAGES_DIR, "val")
TEST_DIR_IMAGES = os.path.join(ROOT_DIR, IMAGES_DIR, "test")
DATASET_DIR = "..\\dataset\\dataset"

## Classes Mapping

We also need to provide a classes mapping, here is how we do that

In [50]:
CLASSES_RAW = [
    "a10",
    "a400m",
    "ag600",
    "ah64",
    "av8b",
    "an124",
    "an22",
    "an225",
    "an72",
    "b1",
    "b2",
    "b21",
    "b52",
    "be200",
    "c130",
    "c17",
    "c2",
    "c390",
    "c5",
    "ch47",
    "cl415",
    "e2",
    "e7",
    "ef2000",
    "f117",
    "f14",
    "f15",
    "f16",
    "f22",
    "f35",
    "f4",
    # IN DATA AS F18
    "f18",
    "h6",
    "j10",
    "j20",
    "jas39",
    "jf17",
    "jh7",
    "kc135",
    "kf21",
    "kj600",
    "ka27",
    "ka52",
    "mq9",
    "mi24",
    "mi26",
    "mi28",
    "mig29",
    "mig31",
    "mirage2000",
    "p3",
    "rq4",
    "rafale",
    "sr71",
    "su24",
    "su25",
    "su34",
    "su57",
    "tb001",
    "tb2",
    "tornado",
    "tu160",
    "tu22m",
    "tu95",
    "u2",
    "uh60",
    "us2",
    "v22",
    "vulcan",
    "wz7",
    "xb70",
    "y20",
    "yf23",
    "z19"
]

CLASSES = {model: i for i, model in enumerate(CLASSES_RAW)}

print(CLASSES)

{'a10': 0, 'a400m': 1, 'ag600': 2, 'ah64': 3, 'av8b': 4, 'an124': 5, 'an22': 6, 'an225': 7, 'an72': 8, 'b1': 9, 'b2': 10, 'b21': 11, 'b52': 12, 'be200': 13, 'c130': 14, 'c17': 15, 'c2': 16, 'c390': 17, 'c5': 18, 'ch47': 19, 'cl415': 20, 'e2': 21, 'e7': 22, 'ef2000': 23, 'f117': 24, 'f14': 25, 'f15': 26, 'f16': 27, 'f22': 28, 'f35': 29, 'f4': 30, 'f18': 31, 'h6': 32, 'j10': 33, 'j20': 34, 'jas39': 35, 'jf17': 36, 'jh7': 37, 'kc135': 38, 'kf21': 39, 'kj600': 40, 'ka27': 41, 'ka52': 42, 'mq9': 43, 'mi24': 44, 'mi26': 45, 'mi28': 46, 'mig29': 47, 'mig31': 48, 'mirage2000': 49, 'p3': 50, 'rq4': 51, 'rafale': 52, 'sr71': 53, 'su24': 54, 'su25': 55, 'su34': 56, 'su57': 57, 'tb001': 58, 'tb2': 59, 'tornado': 60, 'tu160': 61, 'tu22m': 62, 'tu95': 63, 'u2': 64, 'uh60': 65, 'us2': 66, 'v22': 67, 'vulcan': 68, 'wz7': 69, 'xb70': 70, 'y20': 71, 'yf23': 72, 'z19': 73}


### Transforming and Normalizing Bounding Boxes

Since the dataset CSV is given using this format
`filename, width, height, class, xmin, ymin, xmax, ymax`

We need to turn it into a **normalized xywh format**, with the center point, bounding box width and height, all normalized to [0; 1]

The normalize_bounding_box function does just that

In [48]:
def normalize_bounding_box(
    x_min: int, y_min: int, x_max: int, y_max: int, width: int, height: int
) -> tuple[float, float, float, float]:
    """Generates normalized YOLO format xywh coordinates from bounding box pixel coordinates.

    Args:
        x_min (int): min x pixel from bounding box
        y_min (int): min y pixel from bounding box
        x_max (int): max x pixel from bounding box
        y_max (int): max y pixel from bounding box
        width (int): width of the image in pixels
        height (int): height of the image in pixels

    Returns:
        x_center (float): x coordinates of the center of the bounding box [0; 1]
        y_center (float): y coordinates of the center of the bounding box [0; 1]
        width (float): width of the bounding box
        height (float): height of the bounding box
    """

    # First, we transform the min-max bounding box to center
    center_x = (x_min + x_max) / 2
    center_y = (y_min + y_max) / 2

    bb_width = x_max - x_min
    bb_height = y_max - y_min

    return (center_x / width, center_y / height, bb_width / width, bb_height / height)

# Let's test it with a line in our dataset
normalize_bounding_box(852, 177, 1998, 503, 2048, 1365)

(0.69580078125, 0.2490842490842491, 0.5595703125, 0.23882783882783884)

### Extracting and Transforming the Dataset

We need to turn our dataset in the YOLO format, using a very specific directory structure.

### Splitting the Dataset

We need to split out dataset into three parts, train, validation and test.

We're going to use (80%, 10%, 10%) repartition for now.

In [None]:
class EntryType(Enum):
    TRAIN = "train"
    VAL = "val"
    TEST = "test"

    

def split_dataset(seed: int = None) -> EntryType:
    if not seed:
        seed = time.time()

    random.seed(seed)

    rand = random.randint(1, 10)

    type = None

    # Validation (10%)
    if rand == 9:
        type = EntryType.VAL

    # Test (10%)
    elif rand == 10:
        type = EntryType.TEST

    # Train (80%)
    else:
        type = EntryType.TRAIN

    return type


@dataclass
class CSVEntry:
    filename: str
    img_width: int
    img_height: int
    airplane_class: str
    x_min: int
    y_min: int
    x_max: int
    y_max: int


def process_csv_file(filename: str) -> list[CSVEntry]:
    with open(filename, "r") as f:
        lines = f.readlines()

    result = []

    # Line 0 is for the column names
    for line in lines[1:]:
        entries = line.strip().split(",")
        filename = entries[0]
        img_width = int(entries[1])
        img_height = int(entries[2])
        airplane_class = entries[3]
        x_min = int(entries[4])
        y_min = int(entries[5])
        x_max = int(entries[6])
        y_max = int(entries[7])

        result.append(
        CSVEntry(
            filename, img_width, img_height, airplane_class, x_min, y_min, x_max, y_max
        )
    )

    return result

def create_yolo_txt(entries: list[CSVEntry], type: str, filename: str) -> bool:
    to_dir = ""

    if type == EntryType.TRAIN:
        to_dir = TRAIN_DIR_LABELS
    
    elif type == EntryType.VAL:
        to_dir = VAL_DIR_LABELS

    else:
        to_dir = TEST_DIR_LABELS
        

    os.makedirs(to_dir, exist_ok=True)

    fname_noext = Path(filename).stem

    to_create = os.path.join(to_dir, fname_noext + ".txt")


    with open(to_create, "w") as f:
        for entry in entries:
            classid = CLASSES[entry.airplane_class.lower()]
            x_center, y_center, width, height = normalize_bounding_box(entry.x_min, entry.y_min, entry.x_max, entry.y_max, entry.img_width, entry.img_height)
            f.write(' '.join(map(str, [classid, x_center, y_center, width, height])) + '\n')
    
    print(f"Wrote {to_create}")

    return True

def copy_image(type: str, filename: str) -> bool:

    to_dir = ""

    if type == EntryType.TRAIN:
        to_dir = TRAIN_DIR_IMAGES
    
    elif type == EntryType.VAL:
        to_dir = VAL_DIR_IMAGES

    else:
        to_dir = TEST_DIR_IMAGES

    os.makedirs(to_dir, exist_ok=True)

    to_copy = os.path.join(DATASET_DIR, filename + ".jpg")

    to_paste = os.path.join(to_dir, filename + ".jpg")

    shutil.copyfile(to_copy, to_paste)

    return True


def extract_transform_yolo(dataset_dir: str = DATASET_DIR, seed: int = None) -> None:
    directory = os.fsencode(dataset_dir)

    for file in os.listdir(directory):
        filename = os.fsdecode(file)
        # Only process CSV files
        if not filename.endswith("csv"):
            continue
        
        to_open = os.path.join(dataset_dir, filename)
        entries = process_csv_file(to_open)

        to_check = os.path.join(dataset_dir, entries[0].filename + ".jpg")
        if not os.path.exists(to_check):
            print("File associated with CSV not found!")
            continue

        print(f"Processing: {filename}")


        # Determine where to add the entry (train, validation, test)
        type = split_dataset()

        if not create_yolo_txt(entries, type, filename):
            print(f"Error creating {filename} to {type}!")

        if not copy_image(type, entries[0].filename):
             print(f"Error copying {filename} to {type}!")

        
        # time.sleep(1)

    print("ETL done!")

# extract_transform_yolo()