<a href="https://colab.research.google.com/github/eyadashrafkh/AlexEagles_mega_project/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Object Detection Model training using YOLO
References -
- [Documentation](https://docs.ultralytics.com/yolov5/tutorials/train_custom_data/#13-prepare-dataset-for-yolov5)
- [Testing IoU](https://stackoverflow.com/questions/77565416/how-to-test-iou-score-after-training-a-yolo-model)
- [IoU calculation](https://stackoverflow.com/questions/25349178/calculating-percentage-of-bounding-box-overlap-for-image-detector-evaluation)
- [Hungarian Algorithm to match Bounding Boxes](https://gist.github.com/AruniRC/c629c2df0e68e23aff7dcaeef87c72d4)

In [None]:
!pip install ultralytics -q
!pip install fiftyone -q
# Import necessary libraries
import numpy as np
import os, sys
import matplotlib.pyplot as plt
from ultralytics import YOLO
import fiftyone as fo
import fiftyone.zoo as foz
from fiftyone import ViewField as F
import json, shutil
from collections import defaultdict
from itertools import product
from functools import reduce
from scipy.optimize import linear_sum_assignment
import cv2

np.random.seed(0)
# Save to current directory
curr_dir = os.getcwd()

In [None]:
# Load dataset dir
fo.config.dataset_zoo_dir = curr_dir

# Define the 15 classes you want to include
selected_classes = [
    "person",
    "car",
    "motorcycle",
    "airplane",
    "bus",
    "boat",
    "stop sign",
    "snowboard",
    "umbrella",
    "sports ball",
    "baseball bat",
    "bed",
    "tennis racket",
    "suitcase",
    "skis",
]

In [None]:
# Download the data
# By default, the following loads data for detections
dataset = foz.load_zoo_dataset("coco-2017",
                            splits=['train'],
                            shuffle=True,
                            seed=0,
                            max_samples=None,
                            label_types=['detections'],
                            only_matching=True,
                            classes=selected_classes)

In [None]:
# Load the downloaded dataset
coco_dataset = fo.Dataset.from_dir(
    dataset_type=fo.types.COCODetectionDataset,
    data_path='coco-2017/train/data',
    labels_path='coco-2017/train/labels.json',
    max_samples=None,
    include_id=True,
)

In [None]:
# The above downloads all classes in COCO
# We filter them to only have people using the following -
coco_dataset.export(
    labels_path="coco-2017/labels.json",
    dataset_type=fo.types.COCODetectionDataset,
    classes=selected_classes,
)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# We need to convert the dataset to YOLO format
input_dir = curr_dir + "/coco-2017/"
output_dir = "/content/drive/MyDrive/yolo/"

images_folder = input_dir + "train/data/"


if not os.path.exists(output_dir):
	os.mkdir(output_dir)

for split in ['train', 'test']:
	shutil.rmtree(output_dir + split, ignore_errors=True)
	os.mkdir(output_dir + split)
	os.mkdir(output_dir + split + '/images')
	os.mkdir(output_dir + split + '/labels')

In [None]:

ground_truths = defaultdict(list)

# Read the annotations
with open(input_dir + 'labels.json', 'r') as f:
		data = json.load(f)

# Count number of annotations
num_data = len(data['images'])
print(f"Total number of images are {num_data}")

# Choose 80-20 split
num_train = np.floor(0.8*num_data)
num_test = np.floor(0.2*num_data)
print(num_train, num_test)

# Match annotations to images and write in YOLO format
count = 0

id_ann = defaultdict(list)
for ann in data['annotations']:
	id_ann[ann['image_id']].append(ann)

for image in data['images']:
	width = image['width']
	height = image['height']
	filename = image['file_name'].split('.')[0]
	id = image['id']

	# Writing current object and copying image
	if count < num_train:
		split = 'train'
	else:
		split = 'test'

	f = open(f'{output_dir}{split}/labels/{filename}.txt', 'w')

	for annotation in id_ann[id]:
		current_category = annotation['category_id'] - 1
		x, y, w, h = annotation['bbox']

		# Finding midpoints
		x_centre = x + w/2
		y_centre = y + h/2

		# Normalization
		x_centre /= width
		y_centre /= height
		w /= width
		h /= height

		# Limiting upto fix number of decimal places
		sx_centre = format(x_centre, '.6f')
		sy_centre = format(y_centre, '.6f')
		sw = format(w, '.6f')
		sh = format(h, '.6f')


		ground_truths[image['file_name']].append([x_centre, y_centre, w, h])

		f.write(f"{current_category} {sx_centre} {sy_centre} {sw} {sh}\n")
	f.close()
	shutil.copy(images_folder + image['file_name'], f'{output_dir}{split}/images/{filename}.jpg')
	count += 1

In [None]:
# prompt: print number of labels in each training and test directories

import os

def count_labels_in_directories(train_dir, test_dir):
    """Counts the number of labels in each training and testing directory.

    Args:
        train_dir: Path to the training directory.
        test_dir: Path to the testing directory.

    Returns:
        A tuple containing the number of labels in the training and testing directories.
    """
    print(train_dir, test_dir)
    train_labels = 0
    for filename in os.listdir(os.path.join(train_dir, 'labels')):
      if filename.endswith(".txt"):
        train_labels += 1

    test_labels = 0
    for filename in os.listdir(os.path.join(test_dir, 'labels')):
      if filename.endswith(".txt"):
        test_labels += 1

    return train_labels, test_labels

# Example usage (replace with your actual directory paths):
output_dir = "/content/drive/MyDrive/yolo/"
train_labels_count, test_labels_count = count_labels_in_directories(os.path.join(output_dir, 'train'), os.path.join(output_dir, 'test'))

print(f"Number of labels in the training directory: {train_labels_count}")
print(f"Number of labels in the testing directory: {test_labels_count}")

In [None]:
# prompt: write a code to detetrmine the size of each split in GB

import os

def get_directory_size(directory):
  """Returns the size of a directory in GB."""
  total_size = 0
  for dirpath, dirnames, filenames in os.walk(directory):
    for f in filenames:
      fp = os.path.join(dirpath, f)
      # skip if it is symbolic link
      if not os.path.islink(fp):
        total_size += os.path.getsize(fp)

  return total_size / (1024 ** 3)  # Convert bytes to GB

# Example usage (replace with your actual directory paths):
output_dir = "/content/drive/MyDrive/yolo/"
train_dir = os.path.join(output_dir, 'train')
test_dir = os.path.join(output_dir, 'test')


train_size_gb = get_directory_size(train_dir)
test_size_gb = get_directory_size(test_dir)

print(f"Size of the training directory: {train_size_gb:.2f} GB")
print(f"Size of the testing directory: {test_size_gb:.2f} GB")

In [None]:
# Sample an image from the dataset for credibility
train_images = os.listdir("/content/drive/MyDrive/yolo/test/images")
random_images = np.random.choice(train_images, 3)

fig, ax = plt.subplots(1, 3, figsize=(15, 5))

for i, file in enumerate(random_images):
    # Corrected the path to read from the train folder
    img = cv2.imread("/content/drive/MyDrive/yolo/test/images/" + file)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    h, w, _ = img.shape
    anns = ground_truths[file]
    for ann in anns:
        start = (int((ann[0] - ann[2]/2)*w), int((ann[1] - ann[3]/2)*h))
        end = (int((ann[0] + ann[2]/2)*w), int((ann[1] + ann[3]/2)*h))
        img = cv2.rectangle(img, start, end, (0, 255, 0), 2)
    ax[i].imshow(img)

In [None]:
# Load model for training
model = YOLO('yolov8n.pt')

In [None]:
from pathlib import Path
import torch

# Define paths
save_dir = Path('/content/drive/MyDrive/models')  # Save to Google Drive or another persistent location
save_dir.mkdir(parents=True, exist_ok=True)

# Check if a checkpoint exists
checkpoint_path = save_dir / 'last.pt'
resume_training = checkpoint_path.exists()  # Check if a checkpoint exists to resume training

# Train the model
train_results = model.train(
    data='config.yaml',
    batch=16,
    epochs=5,
    plots=True,
    device='mps',  # Use 'mps' for Apple Silicon or 'cuda' for NVIDIA GPUs
    save_dir=save_dir,  # Save checkpoints to this directory
    resume=resume_training,  # Resume training if a checkpoint exists
)

# Save the final model
final_model_path = save_dir / 'final_model.pt'
torch.save(model.state_dict(), final_model_path)
print(f"Final model saved to {final_model_path}")

In [None]:
# Train the model
# Device = mps is for Apple Silicon
train_results = model.train(data='config.yaml', batch=16, epochs=5, plots=True, device='mps')

In [None]:
# Save the model
model.save(filename='trained.pt')

In [None]:
# Load model for training
model = YOLO('yolov8n.yaml')

In [None]:
# Train the model
# Device = mps is for Apple Silicon
train_results = model.train(data='config.yaml', batch=16, epochs=5, plots=True, device='0')

In [None]:
# Save the model
model.save(filename='trained2.pt')

In [None]:
# Load model from trained weights
model.load('trained.pt')

In [None]:
def calc_iou(bb1, bb2):
    b1_x1 = bb1[0] - bb1[2]/2
    b1_x2 = bb1[0] + bb1[2]/2
    b1_y1 = bb1[1] - bb1[3]/2
    b1_y2 = bb1[1] + bb1[3]/2

    b2_x1 = bb2[0] - bb2[2]/2
    b2_x2 = bb2[0] + bb2[2]/2
    b2_y1 = bb2[1] - bb2[3]/2
    b2_y2 = bb2[1] + bb2[3]/2

    # determine the coordinates of the intersection rectangle
    x_left = max(b1_x1, b2_x1)
    y_top = max(b1_y1, b2_y1)
    x_right = min(b1_x2, b2_x2)
    y_bottom = min(b1_y2, b2_y2)

    if x_right < x_left or y_bottom < y_top:
        return 0.0

    # The intersection of two axis-aligned bounding boxes is always an
    # axis-aligned bounding box
    intersection_area = (x_right - x_left) * (y_bottom - y_top)

    # compute the area of both AABBs
    bb1_area = bb1[2]*bb1[3]
    bb2_area = bb2[2]*bb2[3]

    # compute the intersection over union by taking the intersection
    # area and dividing it by the sum of prediction + ground-truth
    # areas - the interesection area
    iou = intersection_area / float(bb1_area + bb2_area - intersection_area)
    assert iou >= 0.0
    assert iou <= 1.0
    return iou

In [None]:
# Test the data with IOU score
test_images_folder = output_dir + 'test/images/'
test_labels_folder = output_dir + 'test/labels'
test_files = os.listdir(test_images_folder)
sum_iou = 0
ious = defaultdict(float)
num_test = len(test_files)

# Create a folder to save the results
results_folder = output_dir + 'results/'
if not os.path.exists(results_folder):
	os.mkdir(results_folder)

# Iterate through the test files to test the performance
for test_file in test_files:
    res = model.predict(test_images_folder + test_file, classes=[0])
    res[0].save(results_folder + test_file)

    gt = ground_truths[test_file]
    preds = res[0].boxes.xywhn.numpy()

    if len(gt) == 0 or len(preds) == 0:
         continue

    combinations = list(product(gt, preds))
    iou_matrix = np.zeros((len(gt), len(preds)))
    for i in range(len(gt)):
        for j in range(len(preds)):
            iou_matrix[i, j] = calc_iou(gt[i], preds[j])

    # Do the Hungarian matching algorithm
    gt_idx, pred_idx = linear_sum_assignment(1 - iou_matrix)
    assigned_ious = np.sort(iou_matrix[gt_idx, pred_idx])[-len(gt):]

    # Compute mean across all instances in the image
    mean_iou = np.mean(assigned_ious)

    assert mean_iou <= 1.0

    sum_iou += mean_iou
    ious[test_file] = (mean_iou, assigned_ious)

In [None]:
# Calculate the mean across all test cases
print("The average IoU across all test instances is", sum_iou/num_test)

In [None]:
# Show some sample output results
n_samples = 6
random_tests = np.random.choice(test_files, n_samples)

fig, ax = plt.subplots(2, 3, figsize=(15, 10))

for t, test_file in enumerate(random_tests):
    res = model.predict(test_images_folder + test_file, classes=[0])
    preds = res[0].boxes.xywhn.numpy()
    img = cv2.imread(test_images_folder + test_file)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    h, w, _ = img.shape
    gt = ground_truths[test_file]

    for ann in gt:
        start = (int((ann[0] - ann[2]/2)*w), int((ann[1] - ann[3]/2)*h))
        end = (int((ann[0] + ann[2]/2)*w), int((ann[1] + ann[3]/2)*h))
        img = cv2.rectangle(img, start, end, (0, 255, 0), 2)

    for ann in preds:
        start = (int((ann[0] - ann[2]/2)*w), int((ann[1] - ann[3]/2)*h))
        end = (int((ann[0] + ann[2]/2)*w), int((ann[1] + ann[3]/2)*h))
        img = cv2.rectangle(img, start, end, (0, 0, 255), 2)

    combinations = list(product(gt, preds))
    iou_matrix = np.zeros((len(gt), len(preds)))
    for i in range(len(gt)):
        for j in range(len(preds)):
            iou_matrix[i, j] = calc_iou(gt[i], preds[j])

    # Do the Hungarian matching algorithm
    gt_idx, pred_idx = linear_sum_assignment(1 - iou_matrix)
    assigned_ious = np.sort(iou_matrix[gt_idx, pred_idx])[-len(gt):]

    # Compute mean across all instances in the image
    mean_iou = np.mean(assigned_ious)

    ax[t // 3][t % 3].imshow(img)
    if mean_iou > 1:
        mean_iou = 0
    ax[t // 3][t % 3].set_title("IoU Score:" + str(mean_iou))