# Waste Detection

## Package Requirements

Firstly, Download the PyTorch before Ultralytics if we want to use CUDA.

In [None]:
# Install PyTorch (CUDA): https://pytorch.org/get-started/locally/
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124

Then, Install neccesary packages:
- `numpy`
- `opencv-python`: reading image with opencv to inference and draw any bounding boxes
- `ultralytics`: a YOLO weight loader
- `matplotlib`: an alternative to opencv for display images in this case.
- `inflection`: a utility package for converting dataset
- `pyyaml`: a utility package for construct dataset
- `scikit-learn`

In [None]:
# Install required packages
%pip install numpy opencv-python ultralytics matplotlib inflection pyyaml scikit-learn

## Setup Dataset

The dataset will be used is [TACO](http://tacodataset.org/), (Trash Annotations in Context) which is an open image dataset of waste. The  annotations are provided in COCO format which means we need to convert it to the YOLO format.

Create directories (if not exists) for YOLO dataset and a folder for images to be downloaded.

In [None]:
import os
from pathlib import Path

cwd = Path.cwd()

DATASET_PATH = cwd / 'dataset'

ORIGINAL_DATASET_PATH = DATASET_PATH / 'original'

ORIGINAL_IMAGES_PATH = ORIGINAL_DATASET_PATH / 'images'
ORIGINAL_INFO_PATH = ORIGINAL_DATASET_PATH / 'annotations.json'

YOLO_DATASET_PATH = DATASET_PATH / 'yolo'

TRAIN_DIR = 'train'
TRAIN_IMAGES_PATH = YOLO_DATASET_PATH / 'images' / TRAIN_DIR
TRAIN_LABELS_PATH = YOLO_DATASET_PATH / 'labels' / TRAIN_DIR

TEST_DIR = 'test'
TEST_IMAGES_PATH = YOLO_DATASET_PATH / 'images' / TEST_DIR
TEST_LABELS_PATH = YOLO_DATASET_PATH / 'labels' / TEST_DIR

VALIDATION_DIR = 'val'
VALIDATION_IMAGES_PATH = YOLO_DATASET_PATH / 'images' / VALIDATION_DIR
VALIDATION_LABELS_PATH = YOLO_DATASET_PATH / 'labels' / VALIDATION_DIR

for dir in [ORIGINAL_DATASET_PATH, ORIGINAL_IMAGES_PATH,
            TRAIN_IMAGES_PATH, TRAIN_LABELS_PATH,
            TEST_IMAGES_PATH, TEST_LABELS_PATH,
            VALIDATION_IMAGES_PATH, VALIDATION_LABELS_PATH]:
    # create directories if not exist
    os.makedirs(dir, exist_ok=True)

Download original annotations and dataset information from [TACO](https://github.com/pedropro/TACO)

In [None]:
from urllib.request import urlretrieve

# https://github.com/pedropro/TACO
DATASET_URL = 'https://raw.githubusercontent.com/pedropro/TACO/refs/heads/master/data/annotations.json'

urlretrieve(DATASET_URL, ORIGINAL_INFO_PATH)

print('dataset information downloaded')

Parses dataset information. (JSON)

In [None]:
import json
with open(ORIGINAL_INFO_PATH) as json_data:
    dataset_info = json.load(json_data)

Download images to `dataset/original/images`.

In [None]:
from os.path import splitext
from urllib.parse import urlparse

images = []
for image in dataset_info['images']:
    url = image['flickr_url']
    _, file_ext = splitext(urlparse(url).path)  # get url's file extension

    # get file name and file path
    file_name = f'{image['id']}{file_ext}'
    image_path = ORIGINAL_IMAGES_PATH / file_name

    images.append({
        'id': image['id'],
        'file_name': file_name,
        'file_path': image_path,
        'width': image['width'],
        'height': image['height'],
        'url': url,
        'labels': [],
    })

print('total image count:', len(images))

Download all images in parallel with exponential backoff for rate-limited cases.

In [None]:
from concurrent.futures import ThreadPoolExecutor
import random
from time import sleep
from urllib.error import HTTPError
from urllib.request import urlretrieve

MAX_BACKOFF = 3 * 60  # 3min

def download_image(image):
    if not image['file_path'].exists():
        retries = 0
        while True:  # infinite loop for retries
            try:
                # download image
                urlretrieve(image['url'], image['file_path'])
                print(f'Downloaded: {image["file_name"]}')
                break
            except HTTPError as e:
                # if got rate-limited
                if e.status == 429:  
                    # exponential backoff
                    delay = min((2 ** retries + random.uniform(0, 1)), MAX_BACKOFF)
                    sleep(delay)

                    if retries > 4:
                        print(f'{image["file_path"]}: rate limited: retries={retries}')
                    retries += 1
                else:
                    raise e

# parallel download using ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
    executor.map(download_image, images)

Prepares data for YOLO.

1. Get super-categories from categories in the dataset information.

2. Put all super-categories in to a list for future use in YOLO dataset format.

In [None]:
from inflection import underscore

def to_snake_case(string):
    string = string.replace(' ', '_').replace('&', 'and')
    string = underscore(string)
    return string

classes = []
for category in dataset_info['categories']:
    supercategory = category['supercategory']

    # convert to snake case
    supercategory = to_snake_case(supercategory)

    if supercategory not in classes:
        classes.append(supercategory)

def as_class_id(category_id):
    category = to_snake_case(dataset_info['categories'][category_id]['supercategory'])
    class_id = classes.index(category)
    return (class_id, classes[class_id])

print('total classes:', len(classes))
print('classes:', ', '.join(classes))

3. Convert COCO format into YOLO format. (`class_id x_center y_center width height`)

In [None]:
def coco_to_yolo(x, y, w, h, img_w, img_h):
    x_center = (x + w / 2) / img_w
    y_center = (y + h / 2) / img_h
    width = w / img_w
    height = h / img_h
    return (x_center, y_center, width, height)


# clear labels
for image in images:
    image['labels'] = []

for annotation in dataset_info['annotations']:
    data = images[annotation['image_id']]
    
    img_w = data['width']
    img_h = data['height']

    class_id, _ = as_class_id(annotation['category_id'])

    x, y, w, h = annotation['bbox']
    x, y, w, h = coco_to_yolo(x, y, w, h, img_w, img_h)

    yolo_line = f'{class_id} {x} {y} {w} {h}'

    data['labels'] += [yolo_line]


4. Filter images with annotations.

In [None]:
images = [image for image in images if len(image['labels']) > 0]
print('total valid image and annotation count:', len(images))

5. Use `train_test_split` from `scikit-learn` to split train/test/validation dataset.

In [None]:
from sklearn.model_selection import train_test_split

image_ids = list(range(len(images)))

train_data, temp_data = train_test_split(image_ids, test_size=0.3, shuffle=True)
val_data, test_data = train_test_split(temp_data, test_size=0.5, shuffle=True)

6. Remove all existing images and labels in the YOLO dataset directory.

7. Copy images from original to the YOLO directory according to the train/test/dataset split.

8. Write transformed labels to the YOLO directory according to the train/test/dataset split.

In [None]:
import shutil

# delete all files in the current yolo folder
for dir in [TRAIN_IMAGES_PATH, TRAIN_LABELS_PATH,
            TEST_IMAGES_PATH, TEST_LABELS_PATH,
            VALIDATION_IMAGES_PATH, VALIDATION_LABELS_PATH]:
    for f in os.listdir(dir):
        if os.path.isfile(dir / f):
            os.remove(dir / f)

# copy images and annotations to each corresponding train/test/validation folder.
for data_list, img_dir, label_dir in [
        (train_data, TRAIN_IMAGES_PATH, TRAIN_LABELS_PATH),
        (test_data, TEST_IMAGES_PATH, TEST_LABELS_PATH),
        (val_data, VALIDATION_IMAGES_PATH, VALIDATION_LABELS_PATH),
    ]:
    for data_id in data_list:
        data = images[data_id]

        # copy image to the folder
        shutil.copy(data['file_path'], img_dir / data['file_name'])
        
        # write yolo labels to a file
        file_name = data['file_path'].stem
        with open(label_dir / f'{file_name}.txt', 'w') as f:
            f.write('\n'.join(data['labels']))

9. Create a YAML file for YOLO to works with, define dataset path, train/test/validation path, class count, and class names.

In [None]:
import yaml

DATASET_YAML = YOLO_DATASET_PATH / 'taco.yaml'

content = {
    'path': str(YOLO_DATASET_PATH),
    'train': 'images/train',
    'test': 'images/test',
    'val': 'images/val',

    'nc': len(classes),
    'names': classes
}
with open(DATASET_YAML, 'w') as f:
    yaml.dump(content, f)

## Model Training

Initialize YOLO object with YOLOv11 weight. (the object will automatically download the weight)

In [None]:
from ultralytics import YOLO
model = YOLO('yolo11n.pt')

Training with dataset at 30 epochs using CUDA.

In [None]:
train_results = model.train(
    data=DATASET_YAML,  # dataset
    epochs=30,          # epochs
    imgsz=640,          # image size
    batch=8,            # batch size
    device=0,           # device to train (cpu or gpu)
    save=True           # save the model as a weight file
)

## Model Inference

Pick 16 random images from validation dataset.

In [None]:
import random

import cv2

inference_imgs = [cv2.imread(images[i]['file_path']) for i in random.sample(val_data, 16)]

Inference the images.

In [None]:
results = model.predict(inference_imgs)

Draw classes and bounding boxes on the images.

In [None]:
import numpy as np

WIDTH = 20
FONT_SCALE = 6

bbox_imgs = []
for result in results:
    image = cv2.cvtColor(result.orig_img, cv2.COLOR_BGR2RGB)
    for box in result.boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(np.int32)  # get bounding box
        cls = box.cls.cpu().numpy().astype(np.int32)[0]       # get class id
        cls = result.names[cls]                               # get class name from the id

        color = (
            random.randint(0, 255),
            random.randint(0, 255),
            random.randint(0, 255)
        )


        # draw a bounding box
        cv2.rectangle(image, (x1, y1), (x2, y2), color, WIDTH)  
        # write a class name to the box
        cv2.putText(image, cls, (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, FONT_SCALE,
                    color, WIDTH)

    bbox_imgs.append(image)

Display the images on 4x4 matplotlib plot.

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

fig, axes = plt.subplots(4, 4, figsize=(12, 12))
for idx, ax in enumerate(axes.ravel()):
    img = bbox_imgs[idx]
    ax.imshow(img)
    ax.axis('off')

plt.tight_layout()
# plt.savefig('output.png')
plt.show()

---