# Dependency install

In [None]:
!pip install roboflow -q
!pip install ultralytics==8.0.196 -q
!pip install wget -q

# Dependency Import

In [60]:
#from roboflow import Roboflow
from ultralytics import YOLO
import json
import shutil
import gdown
import requests
# import yaml
import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import random
from io import BytesIO
from torch.utils.data import Dataset, DataLoader
import cv2
from PIL import Image
import wget
import torch
import torch.nn as nn
from concurrent.futures import ThreadPoolExecutor
# from google.colab import drive
import tensorflow as tf
import zipfile

# Dataset retrieval

There are 2 options that can be taken

1.   Download it from the TACO dataset's JSON
2.   Download from google drive (latest dataset update: 13 Feb 2023)

## Download from json

In [None]:
id = "11tzOy41twUboqYDZx0-AnPNDsGq2A3cn"
output = "unofficial.json"
gdown.download(id=id, output=output)                  # Latest dataset update: 19 Dec 2019

id = "1TzxsRbWdp3y8Mr6oiRQqynDo_MqkOaQi"
output = "official.json"
gdown.download(id=id, output=output)                  # Latest dataset update: 13 Feb 2023

In [None]:
def download_image(image):
    file_name = image['file_name'].split('/')[1].split('.jpg')[0]
    image_url = image['flickr_640_url']
    file_path = image['file_path']

    if not os.path.isfile(file_path):
      if image_url is None:
        image_url = image['flickr_url']
      response = requests.get(image_url)
      img = Image.open(BytesIO(response.content))
      if img.size != (480,640):
        img = img.resize((480,640))
      if img.mode == 'RGBA':
          img = img.convert('RGB')
      try:
          img.save(file_path, exif=img.info["exif"])
      except:
          img.save(file_path)

In [None]:
def resize_bbox(bbox, original_size, new_size):
    original_width, original_height = original_size
    x, y, width, height = bbox

    width_scale = new_size[0] / original_width
    height_scale = new_size[1] / original_height

    new_x = x * width_scale
    new_y = y * height_scale
    new_width = width * width_scale
    new_height = height * height_scale

    return [new_x, new_y, new_width, new_height]

In [None]:
resize_bbox([66.0, 112.333336, 1045.0, 770.000064], (3024,4032), (480,640))

In [None]:
unoff_dir = "/data/unofficial"
if os.path.exists(unoff_dir):
  shutil.rmtree(unoff_dir, ignore_errors=True)
os.makedirs(unoff_dir)
os.makedirs(os.path.join(unoff_dir, "images"))
os.makedirs(os.path.join(unoff_dir, "labels"))

unoff_json = "unofficial.json"

with open(unoff_json ,"r") as f:
  data = json.loads(f.read())
  nr_images = len(data['images'])
  file_names = {}
  image_sizes = {}

  for i in range(nr_images):
    image = data['images'][i]
    file_name = image['file_name'].split('/')[1].split('.')[0]
    id = image['id']
    file_names[id] = file_name
    image_sizes[id] = (image['width'], image['height'])
    data['images'][i]['file_path'] = os.path.join(unoff_dir, "images", file_name+".jpg")


  max_workers = 16
  with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [executor.submit(download_image, image) for image in data['images']]
    for future in futures:
        future.result()

  nr_anno = len(data['annotations'])
  labels = [[] for _ in range(nr_images)]
  for i in range(nr_anno):
    annotation = data['annotations'][i]
    category_id = annotation['category_id']
    bbox = annotation['bbox']
    image_id = annotation['image_id']
    label_path = os.path.join(unoff_dir, "labels")

    new_bbox = resize_bbox(bbox, image_sizes[image_id], (480,640))
    labels[image_id].append([category_id, new_bbox[0], new_bbox[1], new_bbox[2], new_bbox[3]])

  for image_id, annotation_list in enumerate(labels):
    file_path = os.path.join(unoff_dir, "labels", f"{file_names[image_id]}.txt")
    with open(file_path, "w") as file:
      for annotation in annotation_list:
          line = " ".join(map(str, annotation)) + "\n"
          file.write(line)

In [None]:
off_dir = "/data/official"
if os.path.exists(off_dir):
  shutil.rmtree(off_dir, ignore_errors=True)
os.makedirs(off_dir)
os.makedirs(os.path.join(off_dir, "images"))
os.makedirs(os.path.join(off_dir, "labels"))

off_json = "official.json"

with open(off_json ,"r") as f:
  data = json.loads(f.read())
  nr_images = len(data['images'])
  file_names = {}
  image_sizes = {}

  for i in range(nr_images):
    image = data['images'][i]
    batch_num = image['file_name'].split('/')[0]
    file_name = image['file_name'].split('/')[1].split('.')[0]
    file_name = batch_num + "--" + file_name
    id = image['id']
    file_names[id] = file_name
    image_sizes[id] = (image['width'], image['height'])
    data['images'][i]['file_path'] = os.path.join(off_dir, "images", file_name+".jpg")

  max_workers = 16
  with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [executor.submit(download_image, image) for image in data['images']]
    for future in futures:
        future.result()

  nr_anno = len(data['annotations'])
  labels = [[] for _ in range(nr_images)]
  for i in range(nr_anno):
    annotation = data['annotations'][i]
    category_id = annotation['category_id']
    bbox = annotation['bbox']
    image_id = annotation['image_id']
    label_path = os.path.join(off_dir, "labels")

    new_bbox = resize_bbox(bbox, image_sizes[image_id], (480,640))
    labels[image_id].append([category_id, new_bbox[0], new_bbox[1], new_bbox[2], new_bbox[3]])

  for image_id, annotation_list in enumerate(labels):
    file_path = os.path.join(off_dir, "labels", f"{file_names[image_id]}.txt")
    with open(file_path, "w") as file:
      for annotation in annotation_list:
          line = " ".join(map(str, annotation)) + "\n"
          file.write(line)

In [None]:
if not os.path.exists("/drive"):
  drive.mount('/content/drive')

dataset_folder = "data" #@param{type:"string"}

dest_dir = os.path.join("/drive/MyDrive", dataset_folder)
shutil.copytree("/data", dest_dir)

## Download from drive

In [None]:
if os.path.exists("/data"):
  shutil.rmtree("/data")

if not os.path.exists("data.zip"):
  id = "1qzFvq1D9OX_4QT2q5-fN-Ct-slniUv_J"
  output = "/content/data.zip"
  gdown.download(id=id, output=output)

!unzip -q /content/data.zip -d /content/

# Global variables

In [59]:
UNSPLIT_DATASET_DIR = "data"

OFFICIAL_IMG = "official/images"
OFFICIAL_LABEL = "official/labels"
UNOFFICIAL_IMG = "unofficial/images"
UNOFFICIAL_LABEL = "unofficial/labels"

DATASET_DIR = "TACO"

TRAIN_IMG = "train/images"
TRAIN_LABEL = "train/labels"

VAL_IMG = "valid/images"
VAL_LABEL = "valid/labels"

TEST_IMG = "test/images"
TEST_LABEL = "test/labels"

TRAIN_SPLIT = 0.85
VAL_SPLIT = 0.1
TEST_SPLIT = 0.05

IMG_PATH_DICT = {"train": TRAIN_IMG, "val": VAL_IMG, "test": TEST_IMG}
LABEL_PATH_DICT = {"train": TRAIN_LABEL, "val": VAL_LABEL, "test": TEST_LABEL}

USE_SUPERCATEGORY = True

IMG_SIZE_W = 480
IMG_SIZE_H = 640

BATCH_SIZE = 8

In [49]:
official_json = "official.json"

with open(official_json ,"r") as f:
  data = json.loads(f.read())
  nr_categories = len(data['categories'])

  OLD_CLASS_LABELS = {}
  CLASS_LABELS = {}

  for i in range(nr_categories):
    category = data['categories'][i]
    if USE_SUPERCATEGORY:
      category_name = category['supercategory']
    else:
      category_name = category['name']
    category_id = category['id']
    OLD_CLASS_LABELS[category_id] = category_name
    if category_name not in CLASS_LABELS.values():
      CLASS_LABELS[len(CLASS_LABELS)] = category_name

print(OLD_CLASS_LABELS)
print(len(OLD_CLASS_LABELS))
print(CLASS_LABELS)
print(len(CLASS_LABELS))

{0: 'Aluminium foil', 1: 'Battery', 2: 'Blister pack', 3: 'Blister pack', 4: 'Bottle', 5: 'Bottle', 6: 'Bottle', 7: 'Bottle cap', 8: 'Bottle cap', 9: 'Broken glass', 10: 'Can', 11: 'Can', 12: 'Can', 13: 'Carton', 14: 'Carton', 15: 'Carton', 16: 'Carton', 17: 'Carton', 18: 'Carton', 19: 'Carton', 20: 'Cup', 21: 'Cup', 22: 'Cup', 23: 'Cup', 24: 'Cup', 25: 'Food waste', 26: 'Glass jar', 27: 'Lid', 28: 'Lid', 29: 'Other plastic', 30: 'Paper', 31: 'Paper', 32: 'Paper', 33: 'Paper', 34: 'Paper bag', 35: 'Paper bag', 36: 'Plastic bag & wrapper', 37: 'Plastic bag & wrapper', 38: 'Plastic bag & wrapper', 39: 'Plastic bag & wrapper', 40: 'Plastic bag & wrapper', 41: 'Plastic bag & wrapper', 42: 'Plastic bag & wrapper', 43: 'Plastic container', 44: 'Plastic container', 45: 'Plastic container', 46: 'Plastic container', 47: 'Plastic container', 48: 'Plastic glooves', 49: 'Plastic utensils', 50: 'Pop tab', 51: 'Rope & strings', 52: 'Scrap metal', 53: 'Shoe', 54: 'Squeezable tube', 55: 'Straw', 56: '

# Relabel all annotations (do this if supercategories are used)

In [None]:
if USE_SUPERCATEGORY:
    if os.path.exists(os.path.join(UNSPLIT_DATASET_DIR, "official/new-labels")):
        shutil.rmtree(os.path.join(UNSPLIT_DATASET_DIR, "official/new-labels"))
    os.makedirs(os.path.join(UNSPLIT_DATASET_DIR, "official/new-labels"))

    if os.path.exists(os.path.join(UNSPLIT_DATASET_DIR, "unofficial/new-labels")):
        shutil.rmtree(os.path.join(UNSPLIT_DATASET_DIR, "unofficial/new-labels"))
    os.makedirs(os.path.join(UNSPLIT_DATASET_DIR, "unofficial/new-labels"))

    for file_name in os.listdir(os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_LABEL)):
        file_path = os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_LABEL, file_name)
        
        annotations = []
        new_annotations = []

        with open(file_path, "r") as f:
            for line in f:
                annotations.append(line)
        
        for i in range(len(annotations)):
            try:
                annot = annotations[i]
                class_id, bbox_x, bbox_y, bbox_h, bbox_w = annot.split(' ')
                class_name = OLD_CLASS_LABELS[int(class_id)]
                for key, value in CLASS_LABELS.items():
                    if value == class_name:
                        new_class_id = key
                        break
                new_annotations.append([new_class_id, bbox_x, bbox_y, bbox_h, bbox_w])
            except:
                continue
        
        new_label_path = os.path.join(UNSPLIT_DATASET_DIR, "official/new-labels", file_name)
        with open(new_label_path, "w") as f:
            for i in range(len(new_annotations)):
                line = " ".join(map(str, new_annotations[i]))
                f.write(line)

    for file_name in os.listdir(os.path.join(UNSPLIT_DATASET_DIR, UNOFFICIAL_LABEL)):
        file_path = os.path.join(UNSPLIT_DATASET_DIR, UNOFFICIAL_LABEL, file_name)
        
        annotations = []
        new_annotations = []

        with open(file_path, "r") as f:
            for line in f:
                annotations.append(line)

        for i in range(len(annotations)):
            try:
                annot = annotations[i]
                class_id, bbox_x, bbox_y, bbox_h, bbox_w = annot.split(' ')
                class_name = OLD_CLASS_LABELS[int(class_id)]
                for key, value in CLASS_LABELS.items():
                    if value == class_name:
                        new_class_id = key
                        break
                new_annotations.append([new_class_id, bbox_x, bbox_y, bbox_h, bbox_w])
            except:
                continue
        
        new_label_path = os.path.join(UNSPLIT_DATASET_DIR, "unofficial/new-labels", file_name)
        with open(new_label_path, "w") as f:
            for i in range(len(new_annotations)):
                line = " ".join(map(str, new_annotations[i]))
                f.write(line)

    OFFICIAL_LABEL = "official/new-labels"
    UNOFFICIAL_LABEL = "unofficial/new-labels"

            

# Split train-val-test

In [None]:
off_count = len(os.listdir(os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_IMG)))
unoff_count = len(os.listdir(os.path.join(UNSPLIT_DATASET_DIR, UNOFFICIAL_IMG)))
total_dataset_count = off_count + unoff_count

print("Number of official dataset:", off_count)
print("Number of unofficial dataset:", unoff_count)
print("Total number of dataset:", total_dataset_count)

official_train_count = int(total_dataset_count*TRAIN_SPLIT - unoff_count)
official_val_count = int(total_dataset_count*VAL_SPLIT)
official_test_count = off_count - official_train_count - official_val_count

print("")
print("Number of total data for training:", official_train_count + unoff_count)
print("Number of official data for training:", official_train_count)
print("Number of official data for validation:", official_val_count)
print("Number of official data for testing:", official_test_count)


In [None]:
if os.path.exists(DATASET_DIR):
  shutil.rmtree(DATASET_DIR, ignore_errors=True)

os.makedirs(os.path.join(DATASET_DIR, TRAIN_IMG))
os.makedirs(os.path.join(DATASET_DIR, TRAIN_LABEL))
os.makedirs(os.path.join(DATASET_DIR, VAL_IMG))
os.makedirs(os.path.join(DATASET_DIR, VAL_LABEL))
os.makedirs(os.path.join(DATASET_DIR, TEST_IMG))
os.makedirs(os.path.join(DATASET_DIR, TEST_LABEL))

print("Directory created")

for file_name in os.listdir(os.path.join(UNSPLIT_DATASET_DIR, UNOFFICIAL_IMG)):
  orig_path = os.path.join(UNSPLIT_DATASET_DIR, UNOFFICIAL_IMG, file_name)
  dest_path = os.path.join(DATASET_DIR, TRAIN_IMG, file_name)
  shutil.copy(orig_path, dest_path)

print("Unofficial images moved")

for file_name in os.listdir(os.path.join(UNSPLIT_DATASET_DIR, UNOFFICIAL_LABEL)):
  orig_path = os.path.join(UNSPLIT_DATASET_DIR, UNOFFICIAL_LABEL, file_name)
  dest_path = os.path.join(DATASET_DIR, TRAIN_LABEL, file_name)
  shutil.copy(orig_path, dest_path)

print("Unofficial labels moved")

official_files = os.listdir(os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_IMG))
random.shuffle(official_files)

train_dataset = official_files[:official_train_count]
val_dataset = official_files[official_train_count:official_train_count+official_val_count]
test_dataset = official_files[official_train_count+official_val_count:]

for file in train_dataset:
  file_name = file.split(".")[0]

  orig_img_path = os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_IMG, file)
  dest_img_path = os.path.join(DATASET_DIR, TRAIN_IMG, file)
  shutil.copy(orig_img_path, dest_img_path)

  orig_label_path = os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_LABEL, file_name+".txt")
  dest_label_path = os.path.join(DATASET_DIR, TRAIN_LABEL, file_name+".txt")
  shutil.copy(orig_label_path, dest_label_path)

print("Official train moved")

for file in val_dataset:
  file_name = file.split(".")[0]

  orig_img_path = os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_IMG, file)
  dest_img_path = os.path.join(DATASET_DIR, VAL_IMG, file)
  shutil.copy(orig_img_path, dest_img_path)

  orig_label_path = os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_LABEL, file_name+".txt")
  dest_label_path = os.path.join(DATASET_DIR, VAL_LABEL, file_name+".txt")
  shutil.copy(orig_label_path, dest_label_path)

print("Official validation moved")

for file in test_dataset:
  file_name = file.split(".")[0]

  orig_img_path = os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_IMG, file)
  dest_img_path = os.path.join(DATASET_DIR, TEST_IMG, file)
  shutil.copy(orig_img_path, dest_img_path)

  orig_label_path = os.path.join(UNSPLIT_DATASET_DIR, OFFICIAL_LABEL, file_name+".txt")
  dest_label_path = os.path.join(DATASET_DIR, TEST_LABEL, file_name+".txt")
  shutil.copy(orig_label_path, dest_label_path)

print("Official test moved")


In [None]:
print("Number of total training data:", len(os.listdir(os.path.join(DATASET_DIR, TRAIN_IMG))))
print("Number of total validation data:", len(os.listdir(os.path.join(DATASET_DIR, VAL_IMG))))
print("Number of total testing data:", len(os.listdir(os.path.join(DATASET_DIR, TEST_IMG))))

# Utility functions

In [55]:
def read_label(line):
    class_id, box_x, box_y, box_w, box_h = line.strip().split()
    return int(class_id), float(box_x), float(box_y), float(box_w), float(box_h)

In [None]:
def get_file_count_per_data_type(dataset_type):
  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  label_files = [f for f in os.listdir(label_dir)]

  img_dir = os.path.join(DATASET_DIR, IMG_PATH_DICT[dataset_type])
  img_files = [f for f in os.listdir(img_dir)]

  print(f"There are {len(label_files)} files in {dataset_type} set")
  if (len(label_files) != len(img_files)):
    print(f"Warning: There are {len(label_files)} labels but also {len(img_files)} images in {dataset_type} set")

In [None]:
def get_datacount_per_class(dataset_type, class_list):
  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  label_files = [f for f in os.listdir(label_dir)]
  class_counts = [0] * len(class_list)

  for file_name in label_files:
    file_path = os.path.join(label_dir, file_name)
    with open(file_path, "r") as f:
      for line in f:
        class_id, _, _, _, _ = read_label(line)
        class_counts[class_id] += 1
  
  class_names = []
  for _, value in class_list.items():
    class_names.append(value)

  sorted_counts_names = sorted(zip(class_counts, class_names), reverse=True)
  sorted_counts, sorted_names = zip(*sorted_counts_names)

  return sorted_counts, sorted_names

In [None]:
def plot_datacount_per_class(class_list):
    plt.figure(figsize=(21, 21))

    plt.subplot(3, 1, 1)
    count, names = get_datacount_per_class("train", class_list)
    plt.title("Train")
    plt.barh(names, count)

    plt.subplot(3, 1, 2)
    count, names = get_datacount_per_class("val", class_list)
    plt.title("Validation")
    plt.barh(names, count)

    plt.subplot(3, 1, 3)
    count, names = get_datacount_per_class("test", class_list)
    plt.title("Test")
    plt.barh(names, count)

    plt.show()

In [None]:
plot_datacount_per_class(CLASS_LABELS)

In [None]:
def display_images(image_list, num_images, plot_title, dataset_type):
  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  img_dir = os.path.join(DATASET_DIR, IMG_PATH_DICT[dataset_type])
  plt.figure(figsize=(12, 12))
  for i, label_name in enumerate(image_list):
    image_name = label_name.replace(".txt", ".jpg")
    img_path = os.path.join(img_dir, image_name)
    label_path = os.path.join(label_dir, label_name)

    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    h, w, _ = img.shape

    ax = plt.subplot(num_images, num_images, i + 1)
    plt.imshow(img)
    plt.axis("off")

    with open(label_path, "r") as f:
      for line in f:
        _, x1, y1, box_width, box_height = read_label(line)

        rect = patches.Rectangle(
            (x1, y1),
            box_width,
            box_height,
            linewidth=1,
            edgecolor="r",
            facecolor="none",
        )
        ax.add_patch(rect)
      if (i+1 == len(image_list)):
        break
  plt.tight_layout()
  plt.suptitle(
      plot_title,
      fontsize=30,
      y=1.05,
      fontweight="bold",
  )
  plt.show()

In [None]:
def display_images_of_certain_class(class_name, dataset_type, num_images, class_list):
  if class_name not in class_list.values():
    raise ValueError("Class name not found in the class list.")

  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  label_files = [f for f in os.listdir(label_dir)]
  for key, value in class_list.items():
    if value == class_name:
      index = key
      break
  image_list = []

  for file_name in label_files:
    file_path = os.path.join(label_dir, file_name)
    with open(file_path, "r") as f:
      for line in f:
        class_id, _, _, _, _ = read_label(line)
        class_id = int(class_id)
        if class_id == index:
          image_list.append(file_name)
          break
  if len(image_list) > num_images * num_images:
    selected_images = random.sample(image_list, num_images * num_images)
  else:
    selected_images = image_list
  plot_title = f"Displaying images containing class: {class_name}"
  display_images(selected_images, num_images, plot_title, dataset_type)

In [None]:
display_images_of_certain_class("Bottle", "train", 3, CLASS_LABELS)

In [None]:
def get_box_count_per_image(dataset_type, box_count_treshold=30):
  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  label_files = [f for f in os.listdir(label_dir)]

  average_box_count = 0
  max_box_count = 0
  high_box_count_images = []

  for file_name in label_files:
    file_path = os.path.join(label_dir, file_name)
    with open(file_path, "r") as f:
      boxes = f.readlines()
      average_box_count += len(boxes)
      max_box_count = max(max_box_count, len(boxes))
      if (len(boxes)) > box_count_treshold:
        high_box_count_images.append(file_name)

  average_box_count /= len(label_files)

  return average_box_count, max_box_count, high_box_count_images

In [None]:
def get_box_size_per_image(dataset_type, box_size_treshold=0.00015):
  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  label_files = [f for f in os.listdir(label_dir)]

  avg_box_size = 0
  small_box_size_images = []

  for file_name in label_files:
    file_path = os.path.join(label_dir, file_name)
    #print(file_path)
    with open(file_path, "r") as f:
      for line in f:
        #print(line)
        _, _, _, width, height = read_label(line)
        box_size = (width * height) / (IMG_SIZE_W * IMG_SIZE_H)
        avg_box_size += box_size
        #print(width, height, avg_box_size, box_size)
        if box_size < box_size_treshold:
          small_box_size_images.append(file_name)

  avg_box_size /= len(label_files)
  return avg_box_size, small_box_size_images

In [None]:
def get_filtered_boxes_overlap(boxes, iou_treshold=0.35):
  if (len(boxes) == 1):
    return boxes

  removed_boxes = []

  for i in range(len(boxes)-1):
    for j in range(i+1, len(boxes)):
      class1, x1, y1, w1, h1 = read_label(boxes[i])
      class2, x2, y2, w2, h2 = read_label(boxes[j])

      if class1 != class2:
        continue

      x_overlap = max(0, min(x1 + w1 / 2, x2 + w2 / 2) - max(x1 - w1 / 2, x2 - w2 / 2))
      y_overlap = max(0, min(y1 + h1 / 2, y2 + h2 / 2) - max(y1 - h1 / 2, y2 - h2 / 2))

      intersect = x_overlap * y_overlap
      union = w1 * h1 + w2 * h2 - intersect
      IoU = intersect / union

      if IoU > iou_treshold:
        if w1*h1 < w2*h2:
          removed_boxes.append(i)
        else:
          removed_boxes.append(j)

  filtered_boxes = [box for i, box in enumerate(boxes) if i not in removed_boxes]

  return filtered_boxes

In [None]:
def get_number_of_overlapping_boxes(dataset_type, iou_treshold=0.35):
  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  label_files = [f for f in os.listdir(label_dir)]

  total_box_count = 0
  total_high_overlap_box_count = 0
  high_box_overlap_images = []

  for file_name in label_files:
    file_path = os.path.join(label_dir, file_name)

    boxes = []
    with open(file_path, "r") as f:
      for line in f:
        boxes.append(line)

    filtered_boxes = get_filtered_boxes_overlap(boxes, iou_treshold)

    total_box_count += len(boxes)
    total_high_overlap_box_count += len(boxes) - len(filtered_boxes)
    if (len(boxes) != len(filtered_boxes)):
      high_box_overlap_images.append(file_name)
  return total_box_count, total_high_overlap_box_count, high_box_overlap_images

# Preprocessing

## Invalid box boundaries (infinite)

In [None]:
def invalid_box_boundary(dataset_type):
    dir_path = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
    invalid_boxes = 0
    for file_name in os.listdir(dir_path):
        file_path = os.path.join(dir_path, file_name)
        
        new_annotations = []

        with open(file_path, "r") as f:
            for line in f:
                class_id, bbox_x, bbox_y, bbox_h, bbox_w = line.split(' ')
                if (bbox_x == "inf" or bbox_x == "-inf" or bbox_y == "inf" or bbox_y == "-inf" or bbox_h == "inf" or bbox_h == "-inf" or bbox_w == "inf" or bbox_w == "-inf"):
                    invalid_boxes += 1
                    continue
                new_annotations.append([class_id, bbox_x, bbox_y, bbox_h, bbox_w])
        
        with open(file_path, "w") as f:
            for i in range(len(new_annotations)):
                line = " ".join(map(str, new_annotations[i]))
                f.write(line)
    print(f"Found and removed {invalid_boxes} invalid boxes in {dataset_type} set")

In [None]:
invalid_box_boundary("train")
invalid_box_boundary("val")
invalid_box_boundary("test")

## Too many boxes in 1 image

In [None]:
def images_with_high_box_count(dataset_type, num_images, box_count_treshold=30):
  _, _, high_box_count_images = get_box_count_per_image(dataset_type, box_count_treshold = box_count_treshold)

  if len(high_box_count_images) > num_images * num_images:
    selected_images = random.sample(high_box_count_images, num_images * num_images)
  else:
    selected_images = high_box_count_images

  plot_title = f"Images with at least {box_count_treshold} boxes in {dataset_type} set"
  display_images(selected_images, num_images, plot_title, dataset_type)

In [None]:
images_with_high_box_count("train", 3)

In [None]:
def discard_images_with_high_box_count(dataset_type, box_count_treshold=30):
  _, _, high_box_count_images = get_box_count_per_image(dataset_type, box_count_treshold = box_count_treshold)

  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  img_dir = os.path.join(DATASET_DIR, IMG_PATH_DICT[dataset_type])
  count = 0

  for label_name in high_box_count_images:
    img_name = label_name.replace(".txt", ".jpg")

    img_path = os.path.join(img_dir, img_name)
    label_path = os.path.join(label_dir, label_name)

    if os.path.exists(img_path):
      os.remove(img_path)
    if os.path.exists(label_path):
      os.remove(label_path)

    count += 1
  print(f"Removed {count} images from {dataset_type} set")

In [None]:
discard_images_with_high_box_count("train")
discard_images_with_high_box_count("val")
discard_images_with_high_box_count("test")

## Box size too small

In [None]:
def images_with_small_boxes(dataset_type, num_images, box_size_treshold=0.00015):
  _, small_box_size_images = get_box_size_per_image(dataset_type, box_size_treshold = box_size_treshold)

  if len(small_box_size_images) > num_images * num_images:
    selected_images = random.sample(small_box_size_images, num_images * num_images)
  else:
    selected_images = small_box_size_images

  plot_title = f"Images with box sizes below {box_size_treshold} boxes in {dataset_type} set"
  display_images(selected_images, num_images, plot_title, dataset_type)

In [None]:
images_with_small_boxes("train", 3)

In [None]:
def discard_images_with_small_boxes(dataset_type, box_size_treshold=0.00015):
  _, small_box_size_images = get_box_size_per_image(dataset_type, box_size_treshold = box_size_treshold)

  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  img_dir = os.path.join(DATASET_DIR, IMG_PATH_DICT[dataset_type])
  count = 0

  for label_name in small_box_size_images:
    img_name = label_name.replace(".txt", ".jpg")

    img_path = os.path.join(img_dir, img_name)
    label_path = os.path.join(label_dir, label_name)

    if os.path.exists(img_path):
      os.remove(img_path)
    if os.path.exists(label_path):
      os.remove(label_path)

    count += 1
  print(f"Removed {count} images from {dataset_type} set")

In [None]:
discard_images_with_small_boxes("train")
discard_images_with_small_boxes("val")
discard_images_with_small_boxes("test")

## Big overlap between boxes of same class

In [None]:
def images_with_high_box_overlap(dataset_type, num_images, iou_treshold=0.35):
  _, _, high_box_overlap_images = get_number_of_overlapping_boxes(dataset_type, iou_treshold = iou_treshold)

  if len(high_box_overlap_images) > num_images * num_images:
    selected_images = random.sample(high_box_overlap_images, num_images * num_images)
  else:
    selected_images = high_box_overlap_images

  plot_title = f"Images with box overlap above {iou_treshold} boxes in {dataset_type} set"
  display_images(selected_images, num_images, plot_title, dataset_type)

In [None]:
images_with_high_box_overlap("train", 3)

In [None]:
def discard_images_with_high_box_overlap(dataset_type, iou_treshold=0.35):
  _, _, high_box_overlap_images = get_number_of_overlapping_boxes(dataset_type, iou_treshold = iou_treshold)

  label_dir = os.path.join(DATASET_DIR, LABEL_PATH_DICT[dataset_type])
  img_dir = os.path.join(DATASET_DIR, IMG_PATH_DICT[dataset_type])
  count = 0

  for label_name in high_box_overlap_images:
    img_name = label_name.replace(".txt", ".jpg")

    img_path = os.path.join(img_dir, img_name)
    label_path = os.path.join(label_dir, label_name)

    if os.path.exists(img_path):
      os.remove(img_path)
    if os.path.exists(label_path):
      os.remove(label_path)

    count += 1
  print(f"Removed {count} images from {dataset_type} set")

In [None]:
discard_images_with_high_box_overlap("train")
discard_images_with_high_box_overlap("val")
discard_images_with_high_box_overlap("test")

# Model Training

## Dataset Information

In [None]:
get_file_count_per_data_type("train")
get_file_count_per_data_type("val")
get_file_count_per_data_type("test")

In [None]:
print(len(CLASS_LABELS))

In [None]:
def get_image_resolution(image_path):
    with Image.open(image_path) as img:
        width, height = img.size
    return width, height

files = os.listdir(os.path.join(DATASET_DIR, TRAIN_IMG))
file_name = files[0]
file_path = os.path.join(DATASET_DIR, TRAIN_IMG, file_name)
width, height = get_image_resolution(file_path)
print(f"Image resolution: {width}x{height}")

## YOLO V8

### Download YOLO V8

In [10]:
url = "https://github.com/ultralytics/assets/releases/download/v8.2.0/yolov8x.pt"
save_path = "yolov8x.pt"
wget.download(url, save_path)

'yolov8x.pt'

### View architecture

In [68]:
model = YOLO('YOLOv8x.pt')
print(model)

YOLO(
  (model): DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 80, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(80, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(80, 160, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C2f(
        (cv1): Conv(
          (conv): Conv2d(160, 160, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(400, 160, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_r

### Layer Classes

In [79]:
class Conv(tf.keras.layers.Layer):
    def __init__(self, filter_size, kernel_size=3, strides=1, padding='same'):
        super(Conv, self).__init__()
        self.conv = tf.keras.layers.Conv2D(filter_size, kernel_size=kernel_size, strides=strides, padding=padding)
        self.bn = tf.keras.layers.BatchNormalization(epsilon=0.001, momentum=0.03)
        self.actv = tf.keras.layers.Activation('swish')
        
    def call(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.actv(x)
        return x


In [86]:
class Bottleneck(tf.keras.layers.Layer):
    def __init__(self, filter_size):
        super(Bottleneck, self).__init__()
        self.cv1 = Conv(filter_size, 3, 1, 'same')
        self.cv2 = Conv(filter_size, 3, 1, 'same')

    def call(self, x):
        x = self.cv1(x)
        x = self.cv2(x)
        return x

In [82]:
class C2f(tf.keras.layers.Layer):
    def __init__(self, filter_size, num_bottlenecks):
        super(C2f, self).__init__()
        self.cv1 = Conv(filter_size, 1, 1, 'valid')
        self.m = [Bottleneck(filter_size) for _ in range(num_bottlenecks)]
        self.cv2 = Conv(filter_size * (num_bottlenecks + 1), 1, 1, 'valid')

    def call(self, x):
        x = self.cv1(x)
        x = self.cv2(x)
        for layer in self.m:
            x = layer(x)
        return x


In [88]:
class SPPF(tf.keras.Model):
    def __init__(self, filter_size):
        super(SPPF, self).__init__()
        self.cv1 = Conv(filter_size, 1, 1, 'valid')
        self.cv2 = Conv(2 * filter_size, 1, 1, 'valid')
        self.m = tf.keras.layers.MaxPool2D(pool_size=5, strides=1, padding='same')

    def call(self, x):
        x = self.cv1(x)
        x = self.cv2(x)
        x = self.m(x)
        return x

In [73]:
class CustomLayer(tf.keras.layers.Layer):
    def __init__(self, out_channels, kernel_size=3, strides=1, padding='same'):
        super(CustomLayer, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(128, kernel_size=kernel_size, strides=strides, padding=padding)
        self.conv2 = tf.keras.layers.Conv2D(out_channels, kernel_size=kernel_size, strides=strides, padding=padding)
        self.bn = tf.keras.layers.BatchNormalization(epsilon=0.001, momentum=0.03)
        self.relu = tf.keras.layers.ReLU()

    def call(self, x):
        x = self.relu(self.bn(self.conv1(x)))
        x = self.relu(self.bn(self.conv2(x)))
        return x

### Make a new model with the YOLO layers and the new custom layer

In [77]:
test_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(416, 416, 3)),
    tf.keras.layers.Conv2D(80, kernel_size=3, strides=1, padding='same'),
    tf.keras.layers.BatchNormalization(epsilon=0.001, momentum=0.03),
    tf.keras.layers.Activation('swish')
])
test_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

print(test_model.summary())


None


In [89]:
new_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(416, 416, 3)),
    Conv(80, 3, 1, 'same'),
    Conv(160, 3, 1, 'same'),
    C2f(160, 3),
    Conv(320, 3, 2, 'same'),
    C2f(320, 6),
    Conv(640, 3, 2, 'same'),
    C2f(640, 6),
    Conv(640, 3, 2, 'same'),
    C2f(640, 3),
    SPPF(320),
    tf.keras.layers.UpSampling2D(size=2, interpolation='nearest'),
    C2f(640, 3),
    tf.keras.layers.UpSampling2D(size=2, interpolation='nearest'),
    C2f(160, 3),
    Conv(320, 3, 2, 'same'),
    C2f(640, 3),
    Conv(640, 3, 2, 'same'),
    C2f(640, 3),
    CustomLayer(len(CLASS_LABELS), 3, 1, 'same')
])
new_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

print(new_model.summary())

1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''Input 0 of layer "batch_normalization_603" is incompatible with the layer: expected axis -1 of input shape to have value 128, but received input with shape (None, 52, 52, 28)''


RuntimeError: Exception encountered when calling CustomLayer.call().

[1mCould not automatically infer the output shape / dtype of 'custom_layer_6' (of type CustomLayer). Either the `CustomLayer.call()` method is incorrect, or you need to implement the `CustomLayer.compute_output_spec() / compute_output_shape()` method. Error encountered:

Input 0 of layer "batch_normalization_603" is incompatible with the layer: expected axis -1 of input shape to have value 128, but received input with shape (None, 52, 52, 28)[0m

Arguments received by CustomLayer.call():
  • args=('<KerasTensor shape=(None, 52, 52, 640), dtype=float32, sparse=False, name=keras_tensor_263>',)
  • kwargs=<class 'inspect._empty'>

In [None]:
class CustomLayer(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(CustomLayer, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 128, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(128)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(128, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.bn2(self.conv2(x)))
        return x


### Custom Dataset Class

In [57]:
class CustomDataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transform = transform

        self.image_files = os.listdir(image_dir)
        self.label_files = os.listdir(label_dir)

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.image_files[idx])
        label_path = os.path.join(self.label_dir, self.label_files[idx])

        # Load image
        img = Image.open(img_path).convert("RGB")

        # Load label (e.g., parse YOLO format)
        label = []  # Implement label loading
        with open(label_path, "r") as f:
            with line in f:
                label.append(read_label(line))

        # Apply transformations
        if self.transform:
            img = self.transform(img)

        return img, label

### Load the dataset

In [None]:
train_set = CustomDataset(os.path.join(DATASET_DIR, IMG_PATH_DICT['train']), os.path.join(DATASET_DIR, LABEL_PATH_DICT['train']))
val_set = CustomDataset(os.path.join(DATASET_DIR, IMG_PATH_DICT['val']), os.path.join(DATASET_DIR, LABEL_PATH_DICT['val']))
test_set = CustomDataset(os.path.join(DATASET_DIR, IMG_PATH_DICT['test']), os.path.join(DATASET_DIR, LABEL_PATH_DICT['test']))

In [None]:
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=True)

### Train the model

# Inference