In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
!pip install --quiet gdown 

import os
import gdown
import zipfile
import random
import cv2
import matplotlib.pyplot as plt

%matplotlib inline


url_prelim = "https://drive.google.com/file/d/1Yp0If_pGQyOSVaOFUS5piLypDuLiO5tK/view?usp=share_link"
url_objdet = "https://drive.google.com/file/d/1nQBjcg6-sU3yOxMzi0ybTwpQx4H5xuV0/view?usp=share_link"
url_seg    = "https://drive.google.com/file/d/1sd7diZGvNW3-BO0LnYqmTdaCdZKgToJp/view?usp=share_link"

output_prelim = "preliminary.zip"
output_objdet = "object_detection.zip"
output_seg    = "segmentation.zip"

def download_file(url, output_name):
    """
    Uses gdown to download a file from Google Drive (fuzzy=True handles
    typical Google Drive 'view' links).
    """
    print(f"Downloading {output_name} from {url} ...")
    gdown.download(url, output_name, fuzzy=True)
    print(f"Downloaded {output_name}.\n")

download_file(url_prelim, output_prelim)
download_file(url_objdet, output_objdet)
download_file(url_seg, output_seg)

def unzip_file(zip_path, extract_to):
    """
    Unzips a file to the specified directory.
    """
    print(f"Extracting {zip_path} to {extract_to} ...")
    os.makedirs(extract_to, exist_ok=True)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    print(f"Extraction complete for {zip_path}.\n")

unzip_file(output_prelim, "preliminary_data")
unzip_file(output_objdet, "object_detection_data")
unzip_file(output_seg,    "segmentation_data")


!git clone https://github.com/abdulwahabamin/keras-yolo3-attention.git
%cd keras-yolo3-attention

ANNOTATION_FILE = "annot_labels.txt"  
CLASSES_FILE    = "model_data/garbage_classes.txt"

if not os.path.isfile(ANNOTATION_FILE):
    print(f"Warning: {ANNOTATION_FILE} not found. Update the path to your annotation file.")
if not os.path.isfile(CLASSES_FILE):
    print(f"Warning: {CLASSES_FILE} not found. Update the path to your classes file.")

class_names = []
if os.path.isfile(CLASSES_FILE):
    with open(CLASSES_FILE, 'r') as f:
        class_names = [line.strip() for line in f.readlines()]

print("Number of classes:", len(class_names))
print("Classes:", class_names)


image_annotations = []

if os.path.isfile(ANNOTATION_FILE):
    with open(ANNOTATION_FILE, 'r') as f:
        lines = f.readlines()

    for line in lines:
        line = line.strip()
        if not line:
            continue
        parts = line.split()
        # The first part is the image path
        img_path = parts[0]
        bboxes = []
        # Each subsequent part is "x1,y1,x2,y2,class_id"
        for bbox_str in parts[1:]:
            coords = bbox_str.split(',')
            if len(coords) == 5:
                x1, y1, x2, y2, cls_id = map(int, coords)
                bboxes.append((x1, y1, x2, y2, cls_id))
        image_annotations.append({
            'img_path': img_path,
            'bboxes': bboxes
        })

print(f"Parsed {len(image_annotations)} annotated images from {ANNOTATION_FILE}.")


!mkdir -p /Ted/datasets/VOC_DATASET/JPEGImages
!cp /kaggle/working/object_detection_data/WaterTrash_ObjectDetection_LUMS2021_v1/JPEGImages/*.jpg /Ted/datasets/VOC_DATASET/JPEGImages

Downloading preliminary.zip from https://drive.google.com/file/d/1Yp0If_pGQyOSVaOFUS5piLypDuLiO5tK/view?usp=share_link ...


Downloading...
From (original): https://drive.google.com/uc?id=1Yp0If_pGQyOSVaOFUS5piLypDuLiO5tK
From (redirected): https://drive.google.com/uc?id=1Yp0If_pGQyOSVaOFUS5piLypDuLiO5tK&confirm=t&uuid=53d6e88f-2abd-4257-abfe-cc6a711ed8f3
To: /kaggle/working/preliminary.zip
100%|██████████| 522M/522M [00:03<00:00, 151MB/s]  


Downloaded preliminary.zip.

Downloading object_detection.zip from https://drive.google.com/file/d/1nQBjcg6-sU3yOxMzi0ybTwpQx4H5xuV0/view?usp=share_link ...


Downloading...
From (original): https://drive.google.com/uc?id=1nQBjcg6-sU3yOxMzi0ybTwpQx4H5xuV0
From (redirected): https://drive.google.com/uc?id=1nQBjcg6-sU3yOxMzi0ybTwpQx4H5xuV0&confirm=t&uuid=8478ac81-410c-4bb5-a631-f95e3319f1a0
To: /kaggle/working/object_detection.zip
100%|██████████| 2.91G/2.91G [00:15<00:00, 186MB/s] 


Downloaded object_detection.zip.

Downloading segmentation.zip from https://drive.google.com/file/d/1sd7diZGvNW3-BO0LnYqmTdaCdZKgToJp/view?usp=share_link ...


Downloading...
From (original): https://drive.google.com/uc?id=1sd7diZGvNW3-BO0LnYqmTdaCdZKgToJp
From (redirected): https://drive.google.com/uc?id=1sd7diZGvNW3-BO0LnYqmTdaCdZKgToJp&confirm=t&uuid=83904da8-3c29-4936-8ee0-b3d851e516e6
To: /kaggle/working/segmentation.zip
100%|██████████| 1.72G/1.72G [00:16<00:00, 107MB/s] 


Downloaded segmentation.zip.

Extracting preliminary.zip to preliminary_data ...
Extraction complete for preliminary.zip.

Extracting object_detection.zip to object_detection_data ...
Extraction complete for object_detection.zip.

Extracting segmentation.zip to segmentation_data ...
Extraction complete for segmentation.zip.

Cloning into 'keras-yolo3-attention'...
remote: Enumerating objects: 122, done.[K
remote: Counting objects: 100% (122/122), done.[K
remote: Compressing objects: 100% (90/90), done.[K
remote: Total 122 (delta 51), reused 100 (delta 29), pack-reused 0 (from 0)[K
Receiving objects: 100% (122/122), 9.29 MiB | 26.05 MiB/s, done.
Resolving deltas: 100% (51/51), done.
/kaggle/working/keras-yolo3-attention
Number of classes: 12
Classes: ['plastic_bag', 'plastic_wrapper', 'plastic_bottle', 'plastic_cap', 'shoes', 'decor', 'cigarette', 'paper_wrapper', 'cardboard', 'tetrapak', 'cluster', 'other']
Parsed 12650 annotated images from annot_labels.txt.


In [3]:
print("\n--- Step 4: Train/Test Split ---")
random.shuffle(image_annotations)
num_imgs = len(image_annotations)
train_size = int(0.8 * num_imgs)

train_data = image_annotations[:train_size]
test_data  = image_annotations[train_size:]

print(f"Total images: {num_imgs}")
print(f"Train set: {len(train_data)} images")
print(f"Test set:  {len(test_data)} images")


print("\nDone!")



--- Step 4: Train/Test Split ---
Total images: 12650
Train set: 10120 images
Test set:  2530 images

Done!


In [4]:
%%writefile yolo_in_notebook.py
from functools import wraps
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Conv2D, Add, ZeroPadding2D, UpSampling2D, Concatenate, MaxPooling2D, Multiply, Lambda, LeakyReLU, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from yolo3.utils import compose

def DarknetConv2D(*args, **kwargs):
    d = {'kernel_regularizer': l2(5e-4)}
    d['padding'] = 'valid' if kwargs.get('strides')==(2,2) else 'same'
    d.update(kwargs)
    return Conv2D(*args, **d)

def DarknetConv2D_BN_Leaky(*args, **kwargs):
    b = {'use_bias': False}
    b.update(kwargs)
    return compose(
        DarknetConv2D(*args, **b),
        BatchNormalization(),
        LeakyReLU(alpha=0.1)
    )

def resblock_body(x, f, n):
    x = ZeroPadding2D(((1,0),(1,0)))(x)
    x = DarknetConv2D_BN_Leaky(f,(3,3),strides=(2,2))(x)
    for _ in range(n):
        y = compose(
            DarknetConv2D_BN_Leaky(f//2,(1,1)),
            DarknetConv2D_BN_Leaky(f,(3,3))
        )(x)
        x = Add()([x,y])
    return x

def logFunc(x):
    return K.log(K.relu(x)+1)

def attention(x,name):
    x2 = Lambda(logFunc,output_shape=lambda s:s)(x)
    return Multiply(name=name)([x,x2])

def darknet_body(x):
    x = DarknetConv2D_BN_Leaky(32,(3,3))(x)
    x = resblock_body(x,64,1); x = attention(x,'a1')
    x = resblock_body(x,128,2); x = attention(x,'a2')
    x = resblock_body(x,256,8); x = attention(x,'a3')
    x = resblock_body(x,512,8); x = attention(x,'a4')
    x = resblock_body(x,1024,4)
    return x

def make_last_layers(x,f,out):
    x = compose(
        DarknetConv2D_BN_Leaky(f,(1,1)),
        DarknetConv2D_BN_Leaky(f*2,(3,3)),
        DarknetConv2D_BN_Leaky(f,(1,1)),
        DarknetConv2D_BN_Leaky(f*2,(3,3)),
        DarknetConv2D_BN_Leaky(f,(1,1))
    )(x)
    y = compose(
        DarknetConv2D_BN_Leaky(f*2,(3,3)),
        Conv2D(out,(1,1))
    )(x)
    return x,y

def yolo_body(inputs,na,nc):
    d = Model(inputs,darknet_body(inputs))
    x,y1 = make_last_layers(d.output,512,na*(nc+5))
    x = compose(DarknetConv2D_BN_Leaky(256,(1,1)),UpSampling2D(2))(x)
    x = Concatenate()([x,d.layers[158].output])
    x,y2 = make_last_layers(x,256,na*(nc+5))
    x = compose(DarknetConv2D_BN_Leaky(128,(1,1)),UpSampling2D(2))(x)
    x = Concatenate()([x,d.layers[96].output])
    x,y3 = make_last_layers(x,128,na*(nc+5))
    return Model(inputs,[y1,y2,y3])

def tiny_yolo_body(inputs,na,nc):
    x1 = compose(
        DarknetConv2D_BN_Leaky(16,(3,3)),MaxPooling2D((2,2),(2,2),'same'),
        DarknetConv2D_BN_Leaky(32,(3,3)),MaxPooling2D((2,2),(2,2),'same'),
        DarknetConv2D_BN_Leaky(64,(3,3)),MaxPooling2D((2,2),(2,2),'same'),
        DarknetConv2D_BN_Leaky(128,(3,3)),MaxPooling2D((2,2),(2,2),'same'),
        DarknetConv2D_BN_Leaky(256,(3,3))
    )(inputs)
    x2 = compose(
        MaxPooling2D((2,2),(2,2),'same'),
        DarknetConv2D_BN_Leaky(512,(3,3)),
        MaxPooling2D((2,2),(1,1),'same'),
        DarknetConv2D_BN_Leaky(1024,(3,3)),
        DarknetConv2D_BN_Leaky(256,(1,1))
    )(x1)
    y1 = compose(DarknetConv2D_BN_Leaky(512,(3,3)),Conv2D(na*(nc+5),(1,1)))(x2)
    x2 = compose(DarknetConv2D_BN_Leaky(128,(1,1)),UpSampling2D(2))(x2)
    y2 = compose(Concatenate(),DarknetConv2D_BN_Leaky(256,(3,3)),Conv2D(na*(nc+5),(1,1)))([x2,x1])
    return Model(inputs,[y1,y2])

def yolo_head(feats,anchors,nc,input_shape,calc_loss=False):
    na = len(anchors)
    at = tf.reshape(tf.constant(anchors,dtype=feats.dtype),[1,1,1,na,2])
    gs = tf.shape(feats)[1:3]
    gh,gw = gs[0],gs[1]
    gx,gy = tf.meshgrid(tf.range(gw,dtype=feats.dtype),tf.range(gh,dtype=feats.dtype))
    grid = tf.reshape(tf.stack([gx,gy],axis=-1),[1,gh,gw,1,2])
    feats = K.reshape(feats,[-1,gh,gw,na,nc+5])
    xy = (tf.sigmoid(feats[...,0:2]) + grid) / tf.cast([gw,gh],feats.dtype)
    wh = tf.exp(feats[...,2:4]) * at / tf.cast(input_shape[::-1],feats.dtype)
    conf = tf.sigmoid(feats[...,4:5])
    prob = tf.sigmoid(feats[...,5:])
    if calc_loss: return grid,feats,xy,wh
    return xy,wh,conf,prob

def box_iou(b1,b2):
    b1 = tf.expand_dims(b1,-2); xy1=b1[...,:2]; wh1=b1[...,2:4]/2
    m1 = xy1-wh1; M1 = xy1+wh1
    b2 = tf.expand_dims(b2,0); xy2=b2[...,:2]; wh2=b2[...,2:4]/2
    m2 = xy2-wh2; M2 = xy2+wh2
    mi = tf.maximum(m1,m2); Ma = tf.minimum(M1,M2)
    iwh = tf.maximum(Ma-mi,0); inter = iwh[...,0]*iwh[...,1]
    a1 = wh1[...,0]*wh1[...,1]*4; a2=wh2[...,0]*wh2[...,1]*4
    return inter/(a1+a2-inter)

def yolo_loss(args, anchors, num_classes, ignore_thresh=0.5, print_loss=False):
    num_layers = len(anchors) // 3
    yolo_outputs = args[:num_layers]
    y_true        = args[num_layers:]
    anchor_mask   = [[6,7,8],[3,4,5],[0,1,2]] if num_layers==3 else [[3,4,5],[1,2,3]]

    input_shape  = tf.cast(tf.shape(yolo_outputs[0])[1:3] * 32, y_true[0].dtype)
    grid_shapes  = [tf.cast(tf.shape(yolo_outputs[l])[1:3], y_true[0].dtype) for l in range(num_layers)]
    m            = tf.shape(yolo_outputs[0])[0]
    mf           = tf.cast(m, yolo_outputs[0].dtype)

    loss = 0.0
    for l in range(num_layers):
        object_mask       = y_true[l][..., 4:5]
        object_mask_bool  = tf.cast(object_mask, tf.bool)
        true_class_probs  = y_true[l][..., 5:]

        grid, raw_pred, pred_xy, pred_wh = yolo_head(
            yolo_outputs[l],
            anchors[anchor_mask[l]],
            num_classes,
            input_shape,
            calc_loss=True
        )
        pred_box = tf.concat([pred_xy, pred_wh], axis=-1)

        raw_true_xy = (
            y_true[l][..., :2] * tf.reverse(grid_shapes[l], [0])
            - grid
        )
        raw_true_wh = tf.math.log(
            y_true[l][..., 2:4]
            / anchors[anchor_mask[l]]
            * tf.reverse(input_shape, [0])
        )
        raw_true_wh = tf.where(object_mask_bool, raw_true_wh, tf.zeros_like(raw_true_wh))
        box_loss_scale = 2.0 - y_true[l][...,2:3] * y_true[l][...,3:4]

        ignore_mask = tf.TensorArray(dtype=object_mask.dtype, size=0, dynamic_size=True)
        def loop_body(b, im):
            true_box = tf.boolean_mask(
                pred_box[b], tf.cast(object_mask[b,...,0], tf.bool)
            )
            iou = box_iou(pred_box[b], true_box)
            best_iou = tf.reduce_max(iou, axis=-1)
            return b+1, im.write(b, tf.cast(best_iou < ignore_thresh, object_mask.dtype))
        _, ignore_mask = tf.while_loop(lambda b,*_: b < m, loop_body, [0, ignore_mask])
        ignore_mask = ignore_mask.stack()[..., None]

        xy_loss = object_mask * box_loss_scale * tf.nn.sigmoid_cross_entropy_with_logits(
            labels=raw_true_xy, logits=raw_pred[...,0:2]
        )
        wh_loss = object_mask * box_loss_scale * 0.5 * tf.square(
            raw_true_wh - raw_pred[...,2:4]
        )
        confidence_loss = (
            object_mask * tf.nn.sigmoid_cross_entropy_with_logits(
                labels=object_mask, logits=raw_pred[...,4:5]
            )
            + (1-object_mask) * tf.nn.sigmoid_cross_entropy_with_logits(
                labels=object_mask, logits=raw_pred[...,4:5]
            ) * ignore_mask
        )
        class_loss = object_mask * tf.nn.sigmoid_cross_entropy_with_logits(
            labels=true_class_probs, logits=raw_pred[...,5:]
        )

        xy_loss         = tf.reduce_sum(xy_loss) / mf
        wh_loss         = tf.reduce_sum(wh_loss) / mf
        confidence_loss = tf.reduce_sum(confidence_loss) / mf
        class_loss      = tf.reduce_sum(class_loss) / mf

        loss += xy_loss + wh_loss + confidence_loss + class_loss

        if print_loss:
            tf.print("loss:", loss,
                     "xy:", xy_loss,
                     "wh:", wh_loss,
                     "conf:", confidence_loss,
                     "class:", class_loss)

    return loss


Writing yolo_in_notebook.py


In [5]:
import importlib
import yolo_in_notebook
importlib.reload(yolo_in_notebook)


2025-04-22 00:11:22.992593: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1745280683.229047      31 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1745280683.297018      31 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


<module 'yolo_in_notebook' from '/kaggle/working/keras-yolo3-attention/yolo_in_notebook.py'>

In [6]:
from functools import reduce
from PIL import Image
import numpy as np
from matplotlib.colors import rgb_to_hsv, hsv_to_rgb

def compose(*funcs):
    if funcs:
        return reduce(lambda f, g: lambda *a, **kw: g(f(*a, **kw)), funcs)
    else:
        raise ValueError('Composition of empty sequence not supported.')

def letterbox_image(image, size):
    iw, ih = image.size
    w, h = size
    scale = min(w/iw, h/ih)
    nw = int(iw*scale)
    nh = int(ih*scale)
    image = image.resize((nw, nh), Image.BICUBIC)
    new_image = Image.new('RGB', size, (128,128,128))
    new_image.paste(image, ((w-nw)//2, (h-nh)//2))
    return new_image

def rand(a=0, b=1):
    return np.random.rand()*(b-a) + a

def get_random_data(annotation_line, input_shape, random=True, max_boxes=20, jitter=0.3, hue=0.1, sat=1.5, val=1.5, proc_img=True):
    line = annotation_line.split()
    image = Image.open(line[0])
    iw, ih = image.size
    h, w = input_shape
    box = np.array([np.array(list(map(int, box.split(',')))) for box in line[1:]])
    if not random:
        scale = min(w/iw, h/ih)
        nw = int(iw*scale)
        nh = int(ih*scale)
        dx = (w - nw) // 2
        dy = (h - nh) // 2
        image_data = 0
        if proc_img:
            image = image.resize((nw, nh), Image.BICUBIC)
            new_image = Image.new('RGB', (w, h), (128,128,128))
            new_image.paste(image, (dx, dy))
            image_data = np.array(new_image) / 255.
        box_data = np.zeros((max_boxes, 5))
        if len(box) > 0:
            np.random.shuffle(box)
            if len(box) > max_boxes:
                box = box[:max_boxes]
            box[:, [0,2]] = box[:, [0,2]] * scale + dx
            box[:, [1,3]] = box[:, [1,3]] * scale + dy
            box_data[:len(box)] = box
        return image_data, box_data
    new_ar = w/h * rand(1-jitter, 1+jitter) / rand(1-jitter, 1+jitter)
    scale = rand(0.25, 2)
    if new_ar < 1:
        nh = int(scale*h)
        nw = int(nh*new_ar)
    else:
        nw = int(scale*w)
        nh = int(nw/new_ar)
    image = image.resize((nw, nh), Image.BICUBIC)
    dx = int(rand(0, w - nw))
    dy = int(rand(0, h - nh))
    new_image = Image.new('RGB', (w, h), (128,128,128))
    new_image.paste(image, (dx, dy))
    image = new_image
    flip = rand() < 0.5
    if flip:
        image = image.transpose(Image.FLIP_LEFT_RIGHT)
    hue = rand(-hue, hue)
    sat = rand(1, sat) if rand() < 0.5 else 1 / rand(1, sat)
    val = rand(1, val) if rand() < 0.5 else 1 / rand(1, val)
    x = rgb_to_hsv(np.array(image)/255.)
    x[..., 0] += hue
    x[..., 0][x[..., 0] > 1] -= 1
    x[..., 0][x[..., 0] < 0] += 1
    x[..., 1] *= sat
    x[..., 2] *= val
    x[x > 1] = 1
    x[x < 0] = 0
    image_data = hsv_to_rgb(x)
    box_data = np.zeros((max_boxes, 5))
    if len(box) > 0:
        np.random.shuffle(box)
        box[:, [0,2]] = box[:, [0,2]] * nw/iw + dx
        box[:, [1,3]] = box[:, [1,3]] * nh/ih + dy
        if flip:
            box[:, [0,2]] = w - box[:, [2,0]]
        box[:, 0:2][box[:, 0:2] < 0] = 0
        box[:, 2][box[:, 2] > w] = w
        box[:, 3][box[:, 3] > h] = h
        box_w = box[:, 2] - box[:, 0]
        box_h = box[:, 3] - box[:, 1]
        box = box[np.logical_and(box_w > 1, box_h > 1)]
        if len(box) > max_boxes:
            box = box[:max_boxes]
        box_data[:len(box)] = box
    return image_data, box_data

def preprocess_true_boxes(true_boxes, input_shape, anchors, num_classes):
    num_layers = len(anchors) // 3
    anchor_mask = [[6,7,8], [3,4,5], [0,1,2]] if num_layers == 3 else [[3,4,5], [1,2,3]]
    true_boxes = np.array(true_boxes, dtype='float32')
    input_shape = np.array(input_shape, dtype='int32')
    boxes_xy = (true_boxes[..., 0:2] + true_boxes[..., 2:4]) // 2
    boxes_wh = true_boxes[..., 2:4] - true_boxes[..., 0:2]
    true_boxes[..., 0:2] = boxes_xy / input_shape[::-1]
    true_boxes[..., 2:4] = boxes_wh / input_shape[::-1]
    m = true_boxes.shape[0]
    grid_shapes = [input_shape // {0:32,1:16,2:8}[l] for l in range(num_layers)]
    y_true = [np.zeros((m, grid_shapes[l][0], grid_shapes[l][1], len(anchor_mask[l]), 5 + num_classes), dtype='float32')
              for l in range(num_layers)]
    anchors = np.expand_dims(anchors, 0)
    anchor_maxes = anchors / 2.
    anchor_mins = -anchor_maxes
    valid_mask = boxes_wh[..., 0] > 0
    for b in range(m):
        wh = boxes_wh[b, valid_mask[b]]
        if len(wh) == 0:
            continue
        wh = np.expand_dims(wh, -2)
        box_maxes = wh / 2.
        box_mins = -box_maxes
        inter_mins = np.maximum(box_mins, anchor_mins)
        inter_maxes = np.minimum(box_maxes, anchor_maxes)
        inter_wh = np.maximum(inter_maxes - inter_mins, 0.)
        inter_area = inter_wh[..., 0] * inter_wh[..., 1]
        box_area = wh[..., 0] * wh[..., 1]
        anchor_area = anchors[..., 0] * anchors[..., 1]
        iou = inter_area / (box_area + anchor_area - inter_area)
        best_anchor = np.argmax(iou, axis=-1)
        for t, n in enumerate(best_anchor):
            for l in range(num_layers):
                if n in anchor_mask[l]:
                    i = int(np.floor(true_boxes[b, t, 0] * grid_shapes[l][1]))
                    j = int(np.floor(true_boxes[b, t, 1] * grid_shapes[l][0]))
                    k = anchor_mask[l].index(n)
                    c = int(true_boxes[b, t, 4])
                    y_true[l][b, j, i, k, 0:4] = true_boxes[b, t, 0:4]
                    y_true[l][b, j, i, k, 4] = 1
                    y_true[l][b, j, i, k, 5 + c] = 1
    return y_true


In [7]:
from tensorflow.keras.layers import Input
from yolo_in_notebook import yolo_body

num_classes = len(class_names)
num_anchors = 9

image_input = Input(shape=(416, 416, 3))
baseline_model = yolo_body(image_input, num_anchors // 3, num_classes)
baseline_model.summary()


I0000 00:00:1745280697.374763      31 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15513 MB memory:  -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0


In [8]:
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Input, Lambda, Conv2D, MaxPooling2D, UpSampling2D, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt
import cv2
from yolo_in_notebook import yolo_body, yolo_loss



# ----- Helper functions for training and inference -----

def yolo_line_from_dict(annot):
    boxes = annot['bboxes']
    parts = []
    for (x1, y1, x2, y2, cid) in boxes:
        parts.append(f"{x1},{y1},{x2},{y2},{cid}")
    return annot['img_path'] + " " + " ".join(parts)

def create_train_test_splits(image_annotations):
    random.shuffle(image_annotations)
    n = len(image_annotations)
    t_size = int(0.8 * n)
    train_data = image_annotations[:t_size]
    test_data = image_annotations[t_size:]
    with open("train_split.txt", "w") as f:
        for ann in train_data:
            f.write(yolo_line_from_dict(ann) + "\n")
    with open("test_split.txt", "w") as f:
        for ann in test_data:
            f.write(yolo_line_from_dict(ann) + "\n")
    print("Total:", n)
    print("Train:", len(train_data))
    print("Test:", len(test_data))
    return train_data, test_data

def visualize_annotation(annot):
    img_path = annot['img_path']
    bboxes = annot['bboxes']
    img_bgr = cv2.imread(img_path)
    if img_bgr is None:
        print("Could not read:", img_path)
        return
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    for (x1, y1, x2, y2, cid) in bboxes:
        cv2.rectangle(img_rgb, (x1, y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(img_rgb, str(cid), (x1, max(y1 - 5, 15)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
    plt.figure(figsize=(6,6))
    plt.imshow(img_rgb)
    plt.axis('off')
    plt.show()

def get_classes(p):
    with open(p) as f:
        return [c.strip() for c in f.readlines()]

def get_anchors(p):
    with open(p) as f:
        a = f.readline().strip()
    a = [float(x) for x in a.split(',')]
    return np.array(a).reshape(-1, 2)

# --- raw Python generator ---
def data_generator(lines, bs, shape, anchors, num_classes):
    n = len(lines)
    i = 0
    while True:
        img_data = []
        box_data = []
        for b in range(bs):
            if i == 0:
                np.random.shuffle(lines)
            l = lines[i].strip()
            i = (i + 1) % n
            img, box = get_random_data(l, shape, random=True)
            img_data.append(img)
            box_data.append(box)
        img_data = np.array(img_data, dtype=np.float32)
        box_data = np.array(box_data, dtype=np.float32)
        y_true = preprocess_true_boxes(box_data, shape, anchors, num_classes)
        # yield a TUPLE for features, not a list
        yield (img_data, y_true[0], y_true[1], y_true[2]), np.zeros(bs, dtype=np.float32)

# --- Keras wants a “wrapper” that returns that generator ---
def data_generator_wrapper(lines, bs, shape, anchors, num_classes):
    if len(lines)==0 or bs<=0:
        return None
    return data_generator(lines, bs, shape, anchors, num_classes)

# ----- Model / Training functions -----




def create_model(
    shape, anchors, num_classes,
    weights=None, freeze_body=2, ignore_thresh=0.5
):
    K.clear_session()
    inp = Input(shape=(None, None, 3))
    na = len(anchors)

    # placeholders for y_true at 3 scales
    y_true = [
        Input(
            shape=(
                shape[0]//{0:32,1:16,2:8}[l],
                shape[1]//{0:32,1:16,2:8}[l],
                na//3, num_classes+5
            )
        )
        for l in range(3)
    ]

    body = yolo_body(inp, na//3, num_classes)
    if weights:
        body.load_weights(weights, by_name=True, skip_mismatch=True)

    # optionally freeze layers
    if freeze_body in [1,2]:
        cut = (185, len(body.layers)-3)[freeze_body-1]
        for i in range(cut):
            body.layers[i].trainable = False

    loss_out = Lambda(
        lambda args: yolo_loss(args, anchors, num_classes, ignore_thresh=ignore_thresh),
        output_shape=(1,), name='yolo_loss'
    )([*body.output, *y_true])

    return Model([body.input, *y_true], loss_out)

# --- the high‑level train function ---
def train_yolo(
    train_txt="train_split.txt", val_txt="test_split.txt",
    cls_path="model_data/garbage_classes.txt",
    anc_path="model_data/yolo_anchors.txt",
    weights=None, logs="logs/",
    shape=(416,416), freeze_body=2,
    epochs_stage1=5, epochs_stage2=15,
    batch1=8, batch2=4,
    lr1=1e-3, lr2=1e-4
):
    classes     = get_classes(cls_path)
    num_classes = len(classes)
    anchors     = get_anchors(anc_path)

    with open(train_txt) as f: train_lines = f.readlines()
    with open(val_txt)   as f: val_lines   = f.readlines()
    n_train, n_val = len(train_lines), len(val_lines)

    model = create_model(shape, anchors, num_classes, weights=weights, freeze_body=freeze_body)

    # **Stage 1 compile in eager mode**
    model.compile(
        optimizer=Adam(learning_rate=lr1),
        loss={'yolo_loss': lambda y_true, y_pred: y_pred},
        run_eagerly=True
    )
    callbacks = [
        TensorBoard(log_dir=logs),
        ModelCheckpoint(logs + 'stage1.weights.h5', monitor='val_loss',
                        save_weights_only=True, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, verbose=1),
        EarlyStopping(monitor='val_loss', patience=8, verbose=1),
    ]
    model.fit(
        data_generator_wrapper(train_lines, batch1, shape, anchors, num_classes),
        steps_per_epoch=max(1, n_train // batch1),
        validation_data=data_generator_wrapper(val_lines,  batch1, shape, anchors, num_classes),
        validation_steps=max(1, n_val   // batch1),
        epochs=epochs_stage1,
        callbacks=callbacks
    )
    model.save_weights(logs + 'trained_weights_stage1.weights.h5')

    # Unfreeze for fine‑tuning
    for l in model.layers: l.trainable = True

    # **Stage 2 compile in eager mode**
    model.compile(
        optimizer=Adam(learning_rate=lr2),
        loss={'yolo_loss': lambda y_true, y_pred: y_pred},
        run_eagerly=True
    )
    model.fit(
        data_generator_wrapper(train_lines, batch2, shape, anchors, num_classes),
        steps_per_epoch=max(1, n_train // batch2),
        validation_data=data_generator_wrapper(val_lines,  batch2, shape, anchors, num_classes),
        validation_steps=max(1, n_val   // batch2),
        epochs=epochs_stage1 + epochs_stage2,
        initial_epoch=epochs_stage1,
        callbacks=callbacks
    )
    model.save_weights(logs + 'trained_weights_final.weights.h5')

def load_inference_model(anc_path, cls_path, w_path, shape=(416,416)):
    cn = get_classes(cls_path)
    nc = len(cn)
    an = get_anchors(anc_path)
    K.clear_session()
    inp = Input(shape=(None, None, 3))
    from yolo_in_notebook import yolo_body
    body = yolo_body(inp, len(an)//3, nc)
    body.load_weights(w_path)
    return body, an, cn

def detect_image(model_body, anchors, class_names, image, shape=(416,416), thr=0.3):
    from yolo_in_notebook import yolo_eval
    iw, ih = image.size
    rsz = image.resize(shape, Image.BICUBIC)
    arr = np.array(rsz, dtype='float32')/255.
    arr = np.expand_dims(arr, 0)
    inp_shape = K.placeholder(shape=(2,))
    boxes, scores, classes = yolo_eval(model_body.output, anchors, len(class_names), inp_shape, score_threshold=thr, iou_threshold=0.45)
    sess = K.get_session()
    ob, osr, oc = sess.run([boxes, scores, classes],
                           feed_dict={model_body.input: arr, inp_shape: [ih, iw], K.learning_phase(): 0})
    return ob, osr, oc

def run_test_inference(txt_file, anc_path, cls_path, w_path, shape=(416,416), n=5):
    body, anchors, cn = load_inference_model(anc_path, cls_path, w_path, shape)
    with open(txt_file, 'r') as f:
        lines = f.readlines()
    sample_lines = random.sample(lines, min(n, len(lines)))
    for line in sample_lines:
        p = line.split()[0]
        try:
            img = Image.open(p)
        except:
            print("Could not open:", p)
            continue
        b, s, c = detect_image(body, anchors, cn, img, shape)
        d = ImageDraw.Draw(img)
        for i, cl in enumerate(c):
            box = b[i]
            sc = s[i]
            t, l, bt, rt = box
            t = max(0, int(np.floor(t + 0.5)))
            l = max(0, int(np.floor(l + 0.5)))
            bt = min(img.size[1], int(np.floor(bt + 0.5)))
            rt = min(img.size[0], int(np.floor(rt + 0.5)))
            lab = f"{cn[cl]} {sc:.2f}"
            d.rectangle([(l, t), (rt, bt)], outline=(255,0,0))
            d.text((l, t), lab, fill=(255,0,0))
        plt.figure(figsize=(6,6))
        plt.imshow(img)
        plt.axis('off')
        plt.show()


In [None]:
train_data, test_data = create_train_test_splits(image_annotations)
train_yolo(
    train_txt="train_split.txt",
    val_txt="test_split.txt",
    cls_path="model_data/garbage_classes.txt",
    anc_path="model_data/yolo_anchors.txt",
    weights=None,
    logs="logs/",
    shape=(416,416),
    freeze_body=2,
    epochs_stage1=5, epochs_stage2=15,
    batch1=8, batch2=4,
    lr1=1e-3, lr2=1e-4
)

run_test_inference(
    txt_file="test_split.txt",
    anc_path="model_data/garbage_anchors.txt",
    cls_path="model_data/garbage_classes.txt",
    w_path="logs/trained_weights_final.weights.h5",
    shape=(416,416),
    n=5
)


Total: 12650
Train: 10120
Test: 2530



  return {key: serialize_keras_object(value) for key, value in obj.items()}


Epoch 1/5


I0000 00:00:1745280703.809222      31 cuda_dnn.cc:529] Loaded cuDNN version 90300


[1m1265/1265[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1778s[0m 1s/step - loss: 5990.4976 - val_loss: 3032.1042 - learning_rate: 0.0010
Epoch 2/5
[1m1265/1265[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1614s[0m 1s/step - loss: 2523.3105 - val_loss: 1432.9281 - learning_rate: 0.0010
Epoch 3/5
[1m1265/1265[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1600s[0m 1s/step - loss: 1226.1948 - val_loss: 755.5051 - learning_rate: 0.0010
Epoch 4/5
[1m1265/1265[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1590s[0m 1s/step - loss: 656.2452 - val_loss: 424.9641 - learning_rate: 0.0010
Epoch 5/5
[1m1265/1265[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1587s[0m 1s/step - loss: 375.1765 - val_loss: 253.2376 - learning_rate: 0.0010
Epoch 6/20



  return {key: serialize_keras_object(value) for key, value in obj.items()}


[1m2530/2530[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5117s[0m 2s/step - loss: 1336.3729 - val_loss: 98.3215 - learning_rate: 1.0000e-04
Epoch 7/20
[1m2530/2530[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5064s[0m 2s/step - loss: 67.6229 - val_loss: 45.7047 - learning_rate: 1.0000e-04
Epoch 8/20
[1m2530/2530[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5052s[0m 2s/step - loss: 52.6204 - val_loss: 43.6963 - learning_rate: 1.0000e-04
Epoch 9/20
[1m2366/2530[0m [32m━━━━━━━━━━━━━━━━━━[0m[37m━━[0m [1m5:06[0m 2s/step - loss: 43.7404