In [17]:
import os
import re
import cv2
import yaml
import shutil
import random
import torch
from tqdm import tqdm
from pathlib import Path
from ultralytics import YOLO


In [18]:
torch.cuda.set_device(0) # Set to your desired GPU number
IMG_PATH = Path(r'./data/img_dataset')
ANNOTATION_PATH = Path(r'./data/annotation')
train_val_test = (0.8,0.1,0.1)

class_dict = {category_name: idx for idx, category_name in enumerate(os.listdir(IMG_PATH))}
print(class_dict)

{'Bicycle': 0, 'Boat': 1, 'Bottle': 2, 'Bus': 3, 'Car': 4, 'Cat': 5, 'Chair': 6, 'Cup': 7, 'Dog': 8, 'Motorbike': 9, 'People': 10, 'Table': 11}


In [19]:
def copy_dataset(target_dataset):
    base_dataset = r'./data/img_dataset'
    os.makedirs(target_dataset, exist_ok=True)
    for category_name in os.listdir(base_dataset):
        os.makedirs(os.path.join(target_dataset, category_name), exist_ok=True)
        for img_name in os.listdir(os.path.join(base_dataset, category_name)):
            shutil.copy(os.path.join(base_dataset, category_name, img_name), os.path.join(target_dataset, category_name, img_name))

In [20]:
def pascal_to_yolo(image_path):
    image_name = os.path.basename(image_path)
    image_dir = os.path.dirname(image_path).replace("\\", "/")
    file_title, _ = os.path.splitext(image_name)
    yolo_anno_path = image_dir.replace("images", "labels") + f"/{file_title}.txt"

    img = cv2.imread(image_path)
    img_height, img_width = img.shape[:2]

    sub_category = image_dir.split("/")[-1]
    path_from_base = os.path.join(sub_category, image_name)
    
    original_annotation_path = os.path.join(ANNOTATION_PATH, path_from_base + ".txt")
    with open(original_annotation_path, 'r') as anno_file:
        pascal_data = anno_file.readlines()[1:]

    pascal_num = list(map(lambda s: s.strip().split(" ")[:5], pascal_data))
    
    yolo_format = []
    for indv_bounding_coordinates in pascal_num:
        indv_bounding_coordinates[0] = class_dict[indv_bounding_coordinates[0]]
        indv_bounding_coordinates[1:] = list(map(int, indv_bounding_coordinates[1:]))
        # print(indv_bounding_coordinates)

        class_category = indv_bounding_coordinates[0]
        x_ctr = min((indv_bounding_coordinates[1] + indv_bounding_coordinates[3]/2) / img_width, 1.0)
        y_ctr = min((indv_bounding_coordinates[2] + indv_bounding_coordinates[4]/2) / img_height, 1.0)
        width = min((indv_bounding_coordinates[3]) / img_width, 1.0)
        height = min((indv_bounding_coordinates[4]) / img_height, 1.0)
        yolo_format.append([class_category, x_ctr, y_ctr, width, height])
    
    os.makedirs(os.path.dirname(yolo_anno_path), exist_ok=True)
    with open(yolo_anno_path, 'w') as yolo_writer:
        for data in yolo_format:
            yolo_writer.write(f"{' '.join(list(map(str, data)))}\n")
    
    print(f"Converted to {image_path} and label {yolo_anno_path}")
    

In [21]:
def generate_train_val_test(src_path, train_path, val_path, test_path, split_ratio, function=None, seed=None):
    if not os.path.exists(train_path):
        os.makedirs(train_path, exist_ok=True)
    if not os.path.exists(val_path):
        os.makedirs(val_path, exist_ok=True)
    if not os.path.exists(test_path):
        os.makedirs(test_path, exist_ok=True)

    if os.listdir(train_path) != []:
        shutil.rmtree(train_path)
        print(f"Removed existing train path: {train_path}")
    if os.listdir(val_path) != []:
        shutil.rmtree(val_path)
        print(f"Removed existing val path: {val_path}")
    if os.listdir(test_path) != []:
        shutil.rmtree(test_path)
        print(f"Removed existing test path: {test_path}")
    
    if seed != None:
        random.seed(seed)

    file_lists = []
    for (root, _, file_names) in os.walk(src_path):
        if file_names == []:
            continue

        sub_folder_name = os.path.basename(root)

        file_path_list = list(map(lambda path: os.path.join(sub_folder_name, path), file_names))

        random.shuffle(file_path_list)
        file_lists.append(file_path_list)

    for category_specific_list in file_lists:
        for file_name in category_specific_list:
            random_num = random.random()

            if random_num < split_ratio[0]:
                move_path = os.path.join(train_path, os.path.dirname(file_name))
            elif random_num < sum(split_ratio[:2]):
                move_path = os.path.join(val_path, os.path.dirname(file_name)) 
            else:
                move_path = os.path.join(test_path, os.path.dirname(file_name))

            os.makedirs(move_path, exist_ok=True)            
            original_path = os.path.join(src_path, file_name)
            new_name = os.path.join(move_path, os.path.basename(file_name))
            shutil.copy(original_path, move_path)
            if function != None:
                function(move_path)
                

            pascal_to_yolo(new_name)
            
            print(f"{original_path} --> {new_name}")


In [22]:
# check for different file names for images and labels

def check_file_names(folder): # test_data or train_val
    img_folder = os.path.join(folder, "images")
    label_folder = os.path.join(folder, "labels")
    train_img = os.path.join(img_folder, "train")
    val_img = os.path.join(img_folder, "val")
    train_label = os.path.join(label_folder, "train")
    val_label = os.path.join(label_folder, "val")

    for (root, dirname, filename) in os.walk(train_img):
        for objtype in dirname:
            obj_path = os.path.join(root, objtype)
            for (root, dirname, file_names) in os.walk(obj_path):
                for file_name in file_names:
                    file_name = os.path.splitext(file_name)[0]
                    label_name = os.path.join(train_label, objtype, file_name + ".txt")
                    if not os.path.exists(label_name):
                        print(f"Missing label file for {file_name} in {label_name}")
    
    for (root, dirname, filename) in os.walk(val_img):
        for objtype in dirname:
            obj_path = os.path.join(root, objtype)
            for (root, dirname, file_names) in os.walk(obj_path):
                for file_name in file_names:
                    file_name = os.path.splitext(file_name)[0]
                    label_name = os.path.join(val_label, objtype, file_name + ".txt")
                    if not os.path.exists(label_name):
                        print(f"Missing label file for {file_name} in {label_name}")


In [23]:
# check label files if width and height >1
def check_label_files(folder):

    label_folder = os.path.join(folder, "labels")
    print(label_folder)
    for root, dirname, files in os.walk(label_folder):
        if files == []:
            continue
        for file in files:
            if "cache" in file:
                continue
            
            label_path = os.path.join(root, file)
            with open(label_path, 'r') as f:
                lines = f.readlines()
                new_lines = []
                for line in lines:
                    values = list(map(float, line.strip().split()))
                    if values[1] > 1 or values[2] > 1 or values[3] > 1 or values[4] > 1:
                        values[1] = 1.0 if values[1] > 1 else values[1]
                        values[2] = 1.0 if values[2] > 1 else values[2]
                        values[3] = 1.0 if values[3] > 1 else values[3]
                        values[4] = 1.0 if values[4] > 1 else values[4]
                        print(f"Invalid bounding box in {label_path}: {line.strip()}")
                    if values[1] < 0 or values[2] < 0 or values[3] < 0 or values[4] < 0:
                        values[1] = 0.0 if values[1] < 0 else values[1]
                        values[2] = 0.0 if values[2] < 0 else values[2]
                        values[3] = 0.0 if values[3] < 0 else values[3]
                        values[4] = 0.0 if values[4] < 0 else values[4]
                        print(f"Invalid bounding box in {label_path}: {line.strip()}")
                    if len(values) != 5:
                        print(f"Invalid label format in {label_path}: {line.strip()}")
                    new_line = " ".join(str(v) for v in values) + "\n"
                    new_lines.append(new_line)
                # Overwrite the original file with the corrected content
                with open(label_path, 'w') as f:
                    f.writelines(new_lines)
                


In [32]:
def eval_on_test_set(model_path, data_yaml_path):
    model = YOLO('yolov8n.yaml')
    model.load(model_path)
    model.val(data=data_yaml_path, split="test")

In [24]:
model = YOLO('yolov8n.yaml')
model.load('/snapshots/yolov8n.pt')
model.to('cuda')

print(model.info())

Transferred 355/355 items from pretrained weights
YOLOv8n summary: 129 layers, 3,157,200 parameters, 3,157,184 gradients, 8.9 GFLOPs
(129, 3157200, 3157184, 8.8575488)


In [25]:
# train_config = {
#     'data': 'data.yaml',
#     'epochs': 100,
#     'patience': 10,
#     'batch': 16,
#     'imgsz': 640,
#     'save': True,
#     'save_period': 1,
#     'project': r'./model_runs',
#     'name': 'default_augmentation_train',
#     'exist_ok': False,
#     'seed': 0,
#     'resume': True,
#     'lr0': 0.1, 
#     'lrf': 0.001,
#     'dropout': 0.0,
#     'val': True,
#     'plots':True,
# }

# val_config = {
#     'data': 'data.yaml',
#     'batch': 16,
#     'imgsz': 640,
#     'conf': 0.25,
#     'iou': 0.5,
#     'max_det': 100,
#     'plots': True,
#     'split': 'val',
#     'project': r'./model_runs',
#     'name': 'default_augmentation_val'
# }

In [26]:
# model.train(
#     **train_config
# )


In [27]:
# model.val(
#     **val_config
# )

In [None]:
def calc_iou(box1, box2):
    '''pascal format'''
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    area_box1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area_box2 = (box2[2] - box2[0]) * (box2[3] - box2[1])

    union = area_box1 + area_box2 - intersection
    iou = intersection / union if union > 0 else 0

    return iou

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

def parse_results(base_path):
    results_path = os.path.join(base_path, 'results.csv')
    results = pd.read_csv(results_path)
    results['total_train_loss'] = results[['train/box_loss', 'train/cls_loss', 'train/dfl_loss']].sum(axis=1)
    results['total_val_loss'] = results[['val/box_loss', 'val/cls_loss', 'val/dfl_loss']].sum(axis=1)
    return results

results = parse_results(r'./model_runs/default_augmentation_train/')
plt.plot(results['epoch'], results['total_train_loss'], label='Train Loss', color='green')
plt.plot(results['epoch'], results['total_val_loss'], label='Validation Loss', color='red')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

In [None]:
# test_path = r'./data/test_data/images'

# for (root, dirs, file_names) in os.walk(test_path):
#     for file in file_names:
#         file_path = os.path.join(root, file)
#         if "cache" in file:
#             continue

#         model.predict(

#             file_path,
#             imgsz=640,
#             conf=0.25,
#             iou=0.5,
#             max_det=100,
#             save=True,
#             save_txt=True,
#             project=r'./model_runs/default_augmentation_test',
#             name='default_augmentation_test',
#             )

In [None]:
# import os

# predict_run_path = r'model_runs\default_augmentation_test'
# corres_test_path = r'data\test_data\labels'

# def generate_test_folder_list(test_label_folder):
#     test_list = []
#     for folder_name in os.listdir(test_label_folder):
#         for labels_name in os.listdir(os.path.join(test_label_folder, folder_name)):
#             test_list.append(os.path.join(folder_name, labels_name))
        
#     return test_list

# def calc_iou(predicted, target):
#     '''predicted is the prediction, target is actual (for error's sake)'''
#     xmin = max(0, predicted[1] - predicted[3] / 2)
#     ymin = max(0, predicted[2] - predicted[4] / 2)
#     xmax = min(1, predicted[1] + predicted[3] / 2)
#     ymax = min(1, predicted[2] + predicted[4] / 2)

#     pred_box = [xmin, ymin, xmax, ymax]
#     target_box = [target[1] - target[3] / 2, target[2] - target[4] / 2, target[1] + target[3] / 2, target[2] + target[4] / 2]

#     if pred_box[0] > pred_box[2] or pred_box[1] > pred_box[3]:
#         return 0

#     intersection = max(0, min(pred_box[2], target_box[2]) - max(pred_box[0], target_box[0])) * max(0, min(pred_box[3], target_box[3]) - max(pred_box[1], target_box[1]))
#     union = (pred_box[2] - pred_box[0]) * (pred_box[3] - pred_box[1]) + (target_box[2] - target_box[0]) * (target_box[3] - target_box[1]) - intersection

#     return intersection / union if union > 0 else 0

# def max_iou(predicted_label, target_label):
#     with open(predicted_label, 'r') as pred_file:
#         pred_lines = pred_file.readlines()
#     with open(target_label, 'r') as target_file:
#         target_lines = target_file.readlines()

#     pred_boxes = [list(map(float, line.strip().split(" "))) for line in pred_lines]
#     target_boxes = [list(map(float, line.strip().split(" "))) for line in target_lines]
#     print(pred_boxes)
#     print(target_boxes)


# def calculate_prediction_metrics(predict_run_path, corres_test_path):
#     test_list = generate_test_folder_list(corres_test_path)

#     for augmentation_test_num in os.listdir(predict_run_path):
#         test_run_path = os.path.join(predict_run_path, augmentation_test_num)
#         for item_name in os.listdir(test_run_path):
#             if os.path.isfile(os.path.join(test_run_path, item_name)):
#                 image_path = item_name
#                 continue
        
#         file_title, _ = os.path.splitext(image_path)
#         label_title = file_title + '.txt'
#         predicted_label = os.path.join(test_run_path, "labels", label_title)

#         if not os.path.exists(predicted_label):
#             print(f"Missing predicted label file for {label_title} in {predicted_label}")
#             continue

#         for test_label in test_list:
#             if label_title in test_label:
#                 target_label = os.path.join(corres_test_path, test_label)
#                 break

#         max_iou(predicted_label, target_label)
#         break
    
# calculate_prediction_metrics(predict_run_path, corres_test_path)


[[0.0, 0.671866, 0.623762, 0.371998, 0.73051]]
[[0.0, 0.631, 0.5900900900900901, 0.362, 0.8138138138138138]]


In [None]:
model = YOLO('yolov8n.yaml')
model.load('/snapshots/yolov8n.pt')
model.to('cuda')

print(model.info())

In [None]:
peopleless_class_dict = {}
ctr = 0
for category_name in os.listdir(IMG_PATH):
    if category_name == "People":
        continue
    peopleless_class_dict[category_name] = ctr
    ctr += 1
print(peopleless_class_dict)

In [None]:
def pascal_to_yolo_without_person(image_path):
    image_name = os.path.basename(image_path)
    image_dir = os.path.dirname(image_path).replace("\\", "/")
    file_title, _ = os.path.splitext(image_name)
    yolo_anno_path = image_dir.replace("images", "labels") + f"/{file_title}.txt"

    img = cv2.imread(image_path)
    img_height, img_width = img.shape[:2]

    sub_category = image_dir.split("/")[-1]
    path_from_base = os.path.join(sub_category, image_name)
    
    original_annotation_path = os.path.join(ANNOTATION_PATH, path_from_base + ".txt")
    with open(original_annotation_path, 'r') as anno_file:
        pascal_data = anno_file.readlines()[1:]

    pascal_num = list(map(lambda s: s.strip().split(" ")[:5], pascal_data))
    
    yolo_format = []
    for indv_bounding_coordinates in pascal_num:
        if indv_bounding_coordinates[0].lower() == "people":
            continue
        
        indv_bounding_coordinates[0] = peopleless_class_dict[indv_bounding_coordinates[0]]
        indv_bounding_coordinates[1:] = list(map(int, indv_bounding_coordinates[1:]))
        # print(indv_bounding_coordinates)

        class_category = indv_bounding_coordinates[0]
        x_ctr = min((indv_bounding_coordinates[1] + indv_bounding_coordinates[3]/2) / img_width, 1.0)
        y_ctr = min((indv_bounding_coordinates[2] + indv_bounding_coordinates[4]/2) / img_height, 1.0)
        width = min((indv_bounding_coordinates[3]) / img_width, 1.0)
        height = min((indv_bounding_coordinates[4]) / img_height, 1.0)
        yolo_format.append([class_category, x_ctr, y_ctr, width, height])
    
    if len(yolo_format) == 0:
        os.remove(image_path)
        print(f"Removed {image_path} because no bounding box found")
        return

    os.makedirs(os.path.dirname(yolo_anno_path), exist_ok=True)
    with open(yolo_anno_path, 'w') as yolo_writer:
        for data in yolo_format:
            yolo_writer.write(f"{' '.join(list(map(str, data)))}\n")
    
    print(f"Converted to {image_path} and label {yolo_anno_path}")
    

In [None]:
def generate_train_val_test_peopleless(src_path, train_path, val_path, test_path, split_ratio, function=None, seed=None):
    if not os.path.exists(train_path):
        os.makedirs(train_path, exist_ok=True)
    if not os.path.exists(val_path):
        os.makedirs(val_path, exist_ok=True)
    if not os.path.exists(test_path):
        os.makedirs(test_path, exist_ok=True)

    if os.listdir(train_path) != []:
        shutil.rmtree(train_path)
        print(f"Removed existing train path: {train_path}")
    if os.listdir(val_path) != []:
        shutil.rmtree(val_path)
        print(f"Removed existing val path: {val_path}")
    if os.listdir(test_path) != []:
        shutil.rmtree(test_path)
        print(f"Removed existing test path: {test_path}")
    
    if seed != None:
        random.seed(seed)

    file_lists = []
    for (root, _, file_names) in os.walk(src_path):
        if file_names == []:
            continue

        sub_folder_name = os.path.basename(root)

        file_path_list = list(map(lambda path: os.path.join(sub_folder_name, path), file_names))

        random.shuffle(file_path_list)
        file_lists.append(file_path_list)

    for category_specific_list in file_lists:
        for file_name in category_specific_list:
            random_num = random.random()

            if random_num < split_ratio[0]:
                move_path = os.path.join(train_path, os.path.dirname(file_name))
            elif random_num < sum(split_ratio[:2]):
                move_path = os.path.join(val_path, os.path.dirname(file_name)) 
            else:
                move_path = os.path.join(test_path, os.path.dirname(file_name))

            os.makedirs(move_path, exist_ok=True)            
            original_path = os.path.join(src_path, file_name)
            new_name = os.path.join(move_path, os.path.basename(file_name))
            shutil.copy(original_path, move_path)
            if function != None:
                function(move_path)
                

            pascal_to_yolo_without_person(new_name)
            
            print(f"{original_path} --> {new_name}")


In [None]:
peopleless_dataset = './data/people_remove_train_val/images'

generate_train_val_test_peopleless(
    src_path='./data/img_dataset',
    train_path=os.path.join(peopleless_dataset, 'train'),
    val_path=os.path.join(peopleless_dataset, 'val'),
    test_path='./data/people_remove_test_data/images',
    split_ratio=train_val_test,
)

In [None]:
people_removed_train_config = {
    'data': 'people_balanced_data.yaml',
    'epochs': 100,
    'patience': 10,
    'batch': 16,
    'imgsz': 640,
    'save': True,
    'save_period': 1,
    'project': r'./model_runs',
    'name': 'people_removed_train',
    'exist_ok': False,
    'seed': 0,
    'resume': True,
    'lr0': 0.1, 
    'lrf': 0.001,
    'dropout': 0.0,
    'val': True,
    'plots':True,
}

people_removed_val_config = {
    'data': 'people_balanced_data.yaml',
    'batch': 16,
    'imgsz': 640,
    'conf': 0.25,
    'iou': 0.5,
    'max_det': 100,
    'plots': True,
    'split': 'val',
    'project': r'./model_runs',
    'name': 'people_removed_val'
}

In [None]:
model.train(
    **people_removed_train_config
)

In [None]:
model.val(
    **people_removed_val_config
)

In [36]:
eval_on_test_set(
    r'C:\Users\DuenoHfao\Desktop\Work\SCVU\mlgp_full\model_runs\people_removed_train\weights\best.pt'
    , 'people_balanced_data.yaml'
)

Transferred 319/355 items from pretrained weights
Ultralytics 8.3.107  Python-3.10.5 torch-2.6.0+cu126 CUDA:0 (NVIDIA GeForce RTX 3070 Ti, 8192MiB)
YOLOv8n summary (fused): 72 layers, 3,151,904 parameters, 31,920 gradients, 8.7 GFLOPs


FileNotFoundError: [34m[1mval: [0mError loading data from None
See https://docs.ultralytics.com/datasets for dataset formatting guidance.