## Quantization

In [2]:
import torch
from ultralytics import YOLO
import os
import onnx
from onnxconverter_common import float16

model = YOLO("models\\trained_yolov8m.pt") 

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}\n")

# See the model architecture
model.model.info()

# checking the models parameters and their data types
for name, param in model.named_parameters():
    print(f"{name}: {param.dtype}")

# Exporting the model to ONNX format
exported = model.export(format="onnx")
print(f"\nExported model saved to: {exported}")
size_mb = os.path.getsize(f"{exported}") / (1024 * 1024)
print(f"Model size: {size_mb:.2f} MB")

# using half precision
model_fp32 = onnx.load("models\\trained_yolov8m.onnx")
print("\nConverting to half precision...")
model_fp16 = float16.convert_float_to_float16(model_fp32)
onnx.save(model_fp16, "models\\fp16_yolov8m.onnx")
print(f"Exported model saved to: models\\fp16_yolov8m.onnx")
size_mb = os.path.getsize("models\\fp16_yolov8m.onnx") / (1024 * 1024)
print(f"Model size: {size_mb:.2f} MB")



Using device: cuda

Model summary: 169 layers, 25,857,478 parameters, 0 gradients, 79.1 GFLOPs
model.model.0.conv.weight: torch.float32
model.model.0.bn.weight: torch.float32
model.model.0.bn.bias: torch.float32
model.model.1.conv.weight: torch.float32
model.model.1.bn.weight: torch.float32
model.model.1.bn.bias: torch.float32
model.model.2.cv1.conv.weight: torch.float32
model.model.2.cv1.bn.weight: torch.float32
model.model.2.cv1.bn.bias: torch.float32
model.model.2.cv2.conv.weight: torch.float32
model.model.2.cv2.bn.weight: torch.float32
model.model.2.cv2.bn.bias: torch.float32
model.model.2.m.0.cv1.conv.weight: torch.float32
model.model.2.m.0.cv1.bn.weight: torch.float32
model.model.2.m.0.cv1.bn.bias: torch.float32
model.model.2.m.0.cv2.conv.weight: torch.float32
model.model.2.m.0.cv2.bn.weight: torch.float32
model.model.2.m.0.cv2.bn.bias: torch.float32
model.model.2.m.1.cv1.conv.weight: torch.float32
model.model.2.m.1.cv1.bn.weight: torch.float32
model.model.2.m.1.cv1.bn.bias: torc

In [3]:
size_mb = os.path.getsize("models\\trained_yolov11n.onnx") / (1024 * 1024)
print(f"FP32 model size: {size_mb:.2f} MB")

size_mb = os.path.getsize("models\\fp16_yolov11n.onnx") / (1024 * 1024)
print(f"\n FP16 model size: {size_mb:.2f} MB")

FileNotFoundError: [WinError 2] The system cannot find the file specified: 'models\\trained_yolov11n.onnx'

### Integer quantization
Following the steps of https://medium.com/@sulavstha007/quantizing-yolo-v8-models-34c39a2c10e2 static quantization

In [None]:
# TODO: Quantization currently not done

# to preprocess the model
!python -m onnxruntime.quantization.preprocess --input "models\\trained_yolov8m.onnx" --output "models\preprocessed.onnx"

In [None]:
import cv2
import numpy as np
from onnxruntime.quantization import CalibrationDataReader, quantize_static, QuantType, QuantFormat
    
# Class for Callibration Data reading
class ImageCalibrationDataReader(CalibrationDataReader):
    def __init__(self, image_paths):
        self.image_paths = image_paths
        self.idx = 0
        self.input_name = "images"

    def get_next(self):
        # method to iterate through the data set
        if self.idx >= len(self.image_paths):
            return None

        image_path = self.image_paths[self.idx]
        input_data = self.preprocess(image_path)
        self.idx += 1
        return {self.input_name: input_data}

# Assuming you have a list of image paths for calibration
calibration_image_paths = ['datasets\yolo_CropOrWeed2\images\val\ave-0035-0002.jpg',"datasets\yolo_CropOrWeed2\images\val\ave-0035-0006.jpg","datasets\yolo_CropOrWeed2\images\val\ave-0035-0007.jpg"] # you can add more of the image paths

# Create an instance of the ImageCalibrationDataReader
calibration_data_reader = ImageCalibrationDataReader(calibration_image_paths)

In [None]:
# Use the calibration_data_reader with quantize_static
# TODO which nodes to exclude?
quantize_static('preprocessed.onnx', "static_quantized.onnx",
                weight_type=QuantType.QInt8,
                activation_type=QuantType.QUInt8,
                calibration_data_reader=calibration_data_reader,
                quant_format=QuantFormat.QDQ,
                nodes_to_exclude=['/model.22/Concat_3', '/model.22/Split', '/model.22/Sigmoid'
                                 '/model.22/dfl/Reshape', '/model.22/dfl/Transpose', '/model.22/dfl/Softmax', 
                                 '/model.22/dfl/conv/Conv', '/model.22/dfl/Reshape_1', '/model.22/Slice_1',
                                 '/model.22/Slice', '/model.22/Add_1', '/model.22/Sub', '/model.22/Div_1',
                                  '/model.22/Concat_4', '/model.22/Mul_2', '/model.22/Concat_5'],
                per_channel=False,
                reduce_range=True,)

## Distillation

In [None]:
import torch
from ultralytics import YOLO
import os

model = YOLO("models\\trained_yolov8m.pt") 


# Run inference on training set to create pseudo-labels
results = model.predict(
    source="datasets\\100_images",  # path to folder with images
    save=False,                    # saves images with predictions (optional)
    save_txt=True,                # saves predictions in YOLO format (.txt)
    save_conf=True,               # includes confidence score       
    name="pseudo_labels",        # output folder: runs/detect/pseudo_labels
    exist_ok=True                 # overwrite if already exists
)


image 1/100 c:\Users\yurim\Documents\University\UM\Year_3\Bachelor_Thesis\Weed_Detection_eSys\datasets\100_images\ave-0045-0012.jpg: 384x640 14 Crops, 11 Weeds, 49.7ms
image 2/100 c:\Users\yurim\Documents\University\UM\Year_3\Bachelor_Thesis\Weed_Detection_eSys\datasets\100_images\ave-0047-0003.jpg: 384x640 15 Weeds, 10.8ms
image 3/100 c:\Users\yurim\Documents\University\UM\Year_3\Bachelor_Thesis\Weed_Detection_eSys\datasets\100_images\ave-0047-0017.jpg: 384x640 23 Weeds, 12.1ms
image 4/100 c:\Users\yurim\Documents\University\UM\Year_3\Bachelor_Thesis\Weed_Detection_eSys\datasets\100_images\ave-0083-0005.jpg: 384x640 1 Crop, 1 Weed, 10.0ms
image 5/100 c:\Users\yurim\Documents\University\UM\Year_3\Bachelor_Thesis\Weed_Detection_eSys\datasets\100_images\ave-0083-0030.jpg: 384x640 2 Crops, 2 Weeds, 10.6ms
image 6/100 c:\Users\yurim\Documents\University\UM\Year_3\Bachelor_Thesis\Weed_Detection_eSys\datasets\100_images\ave-0105-0018.jpg: 384x640 2 Crops, 16 Weeds, 11.7ms
image 7/100 c:\Use

In [None]:
import torch.nn.functional as F

def compute_detection_loss(predictions, targets):
    """
    Compute the detection loss for YOLO models.
    :param predictions: Output from the student model.
    :param targets: Ground truth labels.
    :return: Total detection loss.
    """
    # Example: Combine classification, objectness, and bounding box losses
    cls_loss = F.cross_entropy(predictions['cls_logits'], targets['cls_labels'])
    obj_loss = F.binary_cross_entropy(predictions['obj_scores'], targets['obj_scores'])
    bbox_loss = F.mse_loss(predictions['bbox_coords'], targets['bbox_coords'])

    total_loss = cls_loss + obj_loss + bbox_loss
    return total_loss

In [None]:
import torch
import torch.nn.functional as F
from ultralytics import YOLO
from Dataloader import CustomDataset
from torch.utils.data import DataLoader

# Load and extract the models
teacher_wrapper = YOLO('models\\trained_yolov8m.pt') # teacher model
teacher_model = teacher_wrapper.model
teacher_model.eval() 

student_wrapper = YOLO('models\custom_model.yaml') # student model
student_model = student_wrapper.model
student_model.train()

# Create dataset and dataloader
dataset = CustomDataset(annotations_dir='datasets\yolo_CropOrWeed2\labels\\train', img_dir='datasets\yolo_CropOrWeed2\images\\train', transform=None)
data_loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=CustomDataset.collate_fn)

# Hyperparameters for distillation
T = 2.0           # Temperature for softening logits
lambda_kd = 0.5   # Weight for the distillation loss

optimizer = torch.optim.Adam(student_model.parameters(), lr=1e-4)

for images, labels in data_loader:
    images = images.to('cuda') 
    labels = labels.to('cuda')
    
    # Teacher output (no gradient tracking)
    with torch.no_grad():
        teacher_outputs = teacher_model(images)
        teacher_logits = teacher_outputs['cls_logits']  # Adjust index as needed
    
    # Student output
    student_outputs = student_model(images)
    student_logits = student_outputs['cls_logits']  # Adjust based on your model structure 

    # Apply temperature scaling
    teacher_soft = F.softmax(teacher_logits / T, dim=1)
    student_log_soft = F.log_softmax(student_logits / T, dim=1)
    
    # Compute the distillation loss (KL-divergence)
    loss_kd = F.kl_div(student_log_soft, teacher_soft, reduction='batchmean') * (T * T)
    
    # Compute the standard detection loss (your custom YOLO loss function)
    # loss_detection = compute_detection_loss(student_outputs, labels) 
    loss_detection = compute_detection_loss(student_outputs, labels)  # TODO see if works

    # Combine the losses
    loss_total = loss_detection + lambda_kd * loss_kd
    
    optimizer.zero_grad()
    loss_total.backward()
    optimizer.step()
    
    print(f"Detection Loss: {loss_detection.item():.4f} | KD Loss: {loss_kd.item():.4f} | Total Loss: {loss_total.item():.4f}")


RuntimeError: Input type (torch.cuda.FloatTensor) and weight type (torch.FloatTensor) should be the same

In [None]:
import torch
import torch.nn.functional as F
from ultralytics import YOLO
from Dataloader import CustomDataset
from torch.utils.data import DataLoader

# Load teacher and student models
device = 'cuda'
teacher = YOLO('models/trained_yolov8m.pt').model.to(device).eval()
student = YOLO('models/custom_model.yaml').model.to(device).train()

# DataLoader setup
dataset = CustomDataset(
    annotations_dir='datasets\yolo_100_images\labels',
    img_dir='datasets\yolo_100_images\images',
    transform=None
)
loader = DataLoader(
    dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=CustomDataset.collate_fn
)

# Distillation hyperparameters
T = 2.0                      # Softmax temperature
lambda_kd = 0.5              # Distillation loss weight
optimizer = torch.optim.Adam(student.parameters(), lr=1e-4)

# Detection loss using Ultralytics implementation
def compute_detection_loss(model, imgs, labels):
    preds = model(imgs)
    loss, _ = model.loss(preds, labels)
    return loss

# Extract class logits from a single head and flatten to [N, C]
def extract_cls_flat(model_outputs, head_index=-1):
    # model_outputs may be (preds_list, features)
    preds = model_outputs[0] if isinstance(model_outputs, tuple) else model_outputs
    out = preds[head_index]        # e.g. tensor of shape (B, A, H, W, 5+nc)
    cls_logits = out[..., 5:]      # drop box coords & objectness → (B,A,H,W,C)
    B, A, H, W, C = cls_logits.shape
    flat = cls_logits.reshape(B * A * H * W, C)
    return flat

# Training loop
for imgs, labels in loader:
    imgs = imgs.to(device)
    labels = [lb.to(device) for lb in labels]

    # Teacher forward
    with torch.no_grad():
        teacher_out = teacher(imgs)
    # Student forward
    student_out = student(imgs)

    # Flatten class logits
    teacher_flat = extract_cls_flat(teacher_out)
    student_flat = extract_cls_flat(student_out)

    # Compute distillation loss (KL divergence)
    t = T
    log_p_s = F.log_softmax(student_flat / t, dim=1)
    p_t = F.softmax(teacher_flat / t, dim=1)
    loss_kd = F.kl_div(log_p_s, p_t, reduction='batchmean') * (t * t)

    # Compute detection loss
    loss_det = compute_detection_loss(student, imgs, labels)

    # Combine losses
    loss = loss_det + lambda_kd * loss_kd

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    print(f"Det Loss: {loss_det.item():.4f} | KD Loss: {loss_kd.item():.4f} | Total: {loss.item():.4f}")


ValueError: not enough values to unpack (expected 5, got 2)

In [None]:
import torch
import torch.nn.functional as F
from ultralytics import YOLO
from Dataloader import CustomDataset
from torch.utils.data import DataLoader

# Load teacher and student as Ultralytics YOLO wrappers
device = 'cuda'
teacher = YOLO('models/trained_yolov8m.pt').model.to(device).eval()
student = YOLO('models/custom_model.yaml').model.to(device).train()

# DataLoader
dataset = CustomDataset(
    annotations_dir='datasets\yolo_100_images\labels',
    img_dir='datasets\yolo_100_images\images',
    transform=None
)
loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=CustomDataset.collate_fn)

# Hyperparameters
lambda_feat = 0.5  # feature distillation weight
default_lr = 1e-4
optimizer = torch.optim.Adam(student.model.parameters(), lr=default_lr)

# Detection loss using wrapper
def compute_detection_loss(wrapper, imgs, labels):
    preds = wrapper(imgs)               # wrapper returns (preds, features)
    loss, _ = wrapper.loss(preds, labels)
    return loss

# Feature extractor layers
teach_feat_layer = teacher.model[:3]  # first modules
stud_feat_layer  = student.model[:3]

# Training loop
for imgs, labels in loader:
    imgs = imgs.to(device)
    labels = [l.to(device) for l in labels]

    # Teacher features
    with torch.no_grad():
        t_feats = teach_feat_layer(imgs)

    # Student features
    s_feats = stud_feat_layer(imgs)

    # Detection loss via wrapper
    loss_det = compute_detection_loss(student, imgs, labels)

    # Feature distillation loss
    loss_feat = F.mse_loss(s_feats, t_feats)

    # Total loss
    total_loss = loss_det + lambda_feat * loss_feat

    optimizer.zero_grad()
    total_loss.backward()
    optimizer.step()

    print(f"Det Loss: {loss_det.item():.4f} | Feat Loss: {loss_feat.item():.4f} | Total: {total_loss.item():.4f}")


RuntimeError: shape '[5, 66, -1]' is invalid for input of size 5

## Pruning (graph based)

In [1]:
# imports
import logging
import os
import json
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import List, Union

import numpy as np
import torch
import torch.nn as nn
from matplotlib import pyplot as plt
from ultralytics import YOLO, __version__
from ultralytics.nn.modules import Detect, C2f, Conv, Bottleneck, C2fCIB, RepVGGDW, Concat, PSA, Attention, C3k2, C3k, CIB
from ultralytics.nn.tasks import attempt_load_one_weight
from ultralytics.engine.model import TASK2DATA
from ultralytics.engine.trainer import BaseTrainer
from ultralytics.utils import yaml_load, LOGGER, RANK, DEFAULT_CFG_DICT, DEFAULT_CFG_KEYS
from ultralytics.utils.checks import check_yaml
from ultralytics.utils.torch_utils import initialize_weights, de_parallel

import torch_pruning as tp

In [4]:
# Helper functions

def save_pruning_performance_graph(x, y1, y11, y2, y3, pruning_method="L2"):
    """
    Draw performance change graph
    Parameters
    ----------
    x : List
        Parameter numbers of all pruning steps
    y1 : List
        mAPs after fine-tuning of all pruning steps
    y11 : List
        mAP50 after fine-tuning of all pruning steps
    y2 : List
        MACs of all pruning steps
    y3 : List
        mAPs after pruning (not fine-tuned) of all pruning steps

    Returns
    -------

    """
    try:
        plt.style.use("ggplot")
    except:
        pass

    x, y1, y11, y2, y3 = np.array(x), np.array(y1), np.array(y11), np.array(y2), np.array(y3)
    y2_ratio = y2 / y2[0]

    # create the figure and the axis object
    fig, ax = plt.subplots(figsize=(12, 8))

    # plot the pruned mAP and recovered mAP
    ax.set_xlabel('Pruning Ratio')
    ax.set_ylabel('mAP')
    ax.plot(x, y1, label='recovered mAP')
    ax.scatter(x, y1)
    ax.plot(x, y11, color='tab:blue', label='recovered mAP50')
    ax.scatter(x, y11, color='tab:blue')
    ax.plot(x, y3, color='tab:gray', label='pruned mAP')
    ax.scatter(x, y3, color='tab:gray')

    # create a second axis that shares the same x-axis
    ax2 = ax.twinx()

    # plot the second set of data
    ax2.set_ylabel('MACs')
    ax2.plot(x, y2_ratio, color='tab:orange', label='MACs')
    ax2.scatter(x, y2_ratio, color='tab:orange')

    # add a legend
    lines, labels = ax.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax2.legend(lines + lines2, labels + labels2, loc='best')

    ax.set_xlim(105, 20)
    ax.set_ylim(0, max(y11) + 0.05)
    ax2.set_ylim(0.05, 1.05)

    # calculate the highest and lowest points for each set of data
    max_y1_idx = np.argmax(y1)
    min_y1_idx = np.argmin(y1)
    max_y11_idx = np.argmax(y11)
    min_y11_idx = np.argmin(y11)
    max_y2_idx = np.argmax(y2)
    min_y2_idx = np.argmin(y2)
    max_y1 = y1[max_y1_idx]
    min_y1 = y1[min_y1_idx]
    max_y11 = y11[max_y11_idx]
    min_y11 = y11[min_y11_idx]
    max_y2 = y2_ratio[max_y2_idx]
    min_y2 = y2_ratio[min_y2_idx]

    # add text for the highest and lowest values near the points
    ax.text(x[max_y1_idx], max_y1 - 0.05, f'max mAP = {max_y1:.2f}', fontsize=10)
    ax.text(x[min_y1_idx], min_y1 + 0.02, f'min mAP = {min_y1:.2f}', fontsize=10)

    ax.text(x[max_y11_idx], max_y11 + 0.02, f'max mAP50 = {max_y11:.2f}', fontsize=10)
    ax.text(x[min_y11_idx], min_y11 + 0.02, f'min mAP50 = {min_y11:.2f}', fontsize=10)
    
    ax2.text(x[max_y2_idx], max_y2 - 0.05, f'max MACs = {max_y2 * y2[0] / 1e9:.2f}G', fontsize=10)
    ax2.text(x[min_y2_idx], min_y2 + 0.02, f'min MACs = {min_y2 * y2[0] / 1e9:.2f}G', fontsize=10)

    plt.title('Comparison of mAP / mAP50 and MACs with Pruning Ratio')
    plt.savefig(f'pruning_perf_change_{pruning_method}.png')

#------------------------------------------------------------------------------------------------

def infer_c3k2_shortcut(bottleneck):
    """
    Infer whether to use shortcut and large-kernel flag from a child block.
    Returns (shortcut: bool, is_cib: bool, lk: bool).
    """
    # Bottleneck case: identical logic to C2f shortcut
    if isinstance(bottleneck, Bottleneck):
        c1 = bottleneck.cv1.conv.in_channels
        c2 = bottleneck.cv2.conv.out_channels
        add_flag = getattr(bottleneck, 'add', False)
        return (c1 == c2 and add_flag), False, False
    # C3k case: preserve shortcut and detect large-kernel usage
    elif isinstance(bottleneck, C3k):
        lk = any(isinstance(mod, RepVGGDW) for mod in getattr(bottleneck, 'm', []))
        add_flag = getattr(bottleneck, 'add', False)
        return add_flag, False, lk
    # Fallback: treat as C3k-like
    else:
        lk = any(isinstance(mod, RepVGGDW) for mod in getattr(bottleneck, 'm', []))
        add_flag = getattr(bottleneck, 'add', False)
        return add_flag, False, lk

class C2f_v2(nn.Module):
    # CSP Bottleneck with 2 convolutions
    def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5,is_CIB = False, lk = False):  # ch_in, ch_out, number, shortcut, groups, expansion
        super().__init__()
        self.c = int(c2 * e)  # hidden channels
        self.cv0 = Conv(c1, self.c, 1, 1)
        self.cv1 = Conv(c1, self.c, 1, 1)
        self.cv2 = Conv((2 + n) * self.c, c2, 1)  # optional act=FReLU(c2)
        if not is_CIB:
            self.m = nn.ModuleList(Bottleneck(self.c, self.c, shortcut, g, k=((3, 3), (3, 3)), e=1.0) for _ in range(n))
        else:
            self.m = nn.ModuleList(CIB(self.c, self.c, shortcut, e=1.0, lk=lk) for _ in range(n))

    def forward(self, x):
        # y = list(self.cv1(x).chunk(2, 1))
        y = [self.cv0(x), self.cv1(x)]
        y.extend(m(y[-1]) for m in self.m)
        return self.cv2(torch.cat(y, 1))

class C3k2_v2(C2f_v2):
    """
    Torch-pruning-compatible version of C3k2.
    Splits the original cv1 into cv0/cv1 and rebuilds its module list.
    """
    def __init__(self, c1, c2, n=1, c3k=False, e=0.5, g=1, shortcut=True):
        # initialize using C2f_v2 scaffold
        super().__init__(c1, c2, n, shortcut, g, e, is_CIB=False, lk=False)
        # override the m list with C3k or Bottleneck blocks
        self.m = nn.ModuleList(
            C3k(self.c, self.c, 2, shortcut, g) if c3k else Bottleneck(self.c, self.c, shortcut, g)
            for _ in range(n)
        )


def transfer_weights_c3k2(src: C3k2, dst: C3k2_v2):
    """
    Transfer parameters and buffers from original C3k2 to C3k2_v2.
    """
    # reuse final conv and module list pointers
    dst.cv2 = src.cv2
    dst.m = src.m

    state_src = src.state_dict()
    state_dst = dst.state_dict()

    # split cv1 weights and BN buffers into cv0 and cv1
    w_old = state_src['cv1.conv.weight']
    half = w_old.shape[0] // 2
    state_dst['cv0.conv.weight'] = w_old[:half]
    state_dst['cv1.conv.weight'] = w_old[half:]
    for bn in ['weight', 'bias', 'running_mean', 'running_var']:
        v = state_src[f'cv1.bn.{bn}']
        state_dst[f'cv0.bn.{bn}'] = v[:half]
        state_dst[f'cv1.bn.{bn}'] = v[half:]

    # copy all other parameters and buffers
    for key, val in state_src.items():
        if not key.startswith('cv1.'):
            state_dst[key] = val

    # copy non-callable attributes
    for attr in dir(src):
        if not callable(getattr(src, attr)) and '_' not in attr:
            setattr(dst, attr, getattr(src, attr))

    dst.load_state_dict(state_dst)


def replace_c3k2_with_v2(module: nn.Module):
    """
    Recursively replace all C3k2 instances in `module` with C3k2_v2.
    """
    for name, child in module.named_children():
        if isinstance(child, C3k2):
            # infer flags from first inner block
            c3k_flag = isinstance(child.m[0], C3k)
            shortcut, is_cib, lk = infer_c3k2_shortcut(child.m[0])
            # instantiate new block
            v2 = C3k2_v2(
                child.cv1.conv.in_channels,
                child.cv2.conv.out_channels,
                n=len(child.m),
                c3k=c3k_flag,
                e=child.c / child.cv2.conv.out_channels,
                g=(child.m[0].cv2.conv.groups if not is_cib else child.cv2.conv.groups),
                shortcut=shortcut,
            )
            # transfer weights and replace
            transfer_weights_c3k2(child, v2)
            setattr(module, name, v2)
        else:
            replace_c3k2_with_v2(child)


#------------------------------------------------------------------------------------------------

def save_model_v2(self: BaseTrainer):
    """
    Disabled half precision saving. originated from ultralytics/yolo/engine/trainer.py
    """
    ckpt = {
        'epoch': self.epoch,
        'best_fitness': self.best_fitness,
        'model': deepcopy(de_parallel(self.model)),
        'ema': deepcopy(self.ema.ema),
        'updates': self.ema.updates,
        'optimizer': self.optimizer.state_dict(),
        'train_args': vars(self.args),  # save as dict
        'date': datetime.now().isoformat(),
        'version': __version__}

    # Save last, best and delete
    torch.save(ckpt, self.last)
    if self.best_fitness == self.fitness:
        torch.save(ckpt, self.best)
    if (self.epoch > 0) and (self.save_period > 0) and (self.epoch % self.save_period == 0):
        torch.save(ckpt, self.wdir / f'epoch{self.epoch}.pt')
    del ckpt


def final_eval_v2(self: BaseTrainer):
    """
    originated from ultralytics/yolo/engine/trainer.py
    """
    for f in self.last, self.best:
        if f.exists():
            strip_optimizer_v2(f)  # strip optimizers
            if f is self.best:
                LOGGER.info(f'\nValidating {f}...')
                self.metrics = self.validator(model=f)
                self.metrics.pop('fitness', None)
                self.run_callbacks('on_fit_epoch_end')


def strip_optimizer_v2(f: Union[str, Path] = 'best.pt', s: str = '') -> None:
    """
    Disabled half precision saving. originated from ultralytics/yolo/utils/torch_utils.py
    """
    x = torch.load(f, map_location=torch.device('cpu'))
    args = {**DEFAULT_CFG_DICT, **x['train_args']}  # combine model args with default args, preferring model args
    if x.get('ema'):
        x['model'] = x['ema']  # replace model with ema
    for k in 'optimizer', 'ema', 'updates':  # keys
        x[k] = None
    for p in x['model'].parameters():
        p.requires_grad = False
    x['train_args'] = {k: v for k, v in args.items() if k in DEFAULT_CFG_KEYS}  # strip non-default keys
    # x['model'].args = x['train_args']
    torch.save(x, s or f)
    mb = os.path.getsize(s or f) / 1E6  # filesize
    LOGGER.info(f"Optimizer stripped from {f},{f' saved as {s},' if s else ''} {mb:.1f}MB")


def train_v2(self: YOLO, **train_params):
    """
    Disabled loading new model when pruning flag is set. originated from ultralytics/yolo/engine/model.py
    """
    
    self._check_is_pytorch_model()
    # Override Training Parameters with provided ones 
    overrides = self.overrides.copy()
    overrides.update(train_params)
    if train_params.get('cfg'):
        overrides = yaml_load(check_yaml(train_params['cfg']))
    overrides['mode'] = 'train'
    if not overrides.get('data'):
        raise AttributeError("Dataset required but missing, i.e. pass 'data=coco128.yaml'")
    if overrides.get('resume'):
        overrides['resume'] = self.ckpt_path

    # Initialize trainer
    self.task = "detect"
    self.callbacks = []
    self.trainer = self.task_map[self.task]['trainer'](overrides=overrides, _callbacks=self.callbacks)

    self.trainer.verbose = False  

    # pruning mode
    self.trainer.pruning = True
    self.trainer.model = self.model
    
    # replace some functions to disable half precision saving
    self.trainer.save_model = save_model_v2.__get__(self.trainer)
    self.trainer.final_eval = final_eval_v2.__get__(self.trainer)

    self.trainer.train()
    # Update model and cfg after training
    if RANK in (-1, 0):
        self.model, _ = attempt_load_one_weight(str(self.trainer.best))
        self.overrides = self.model.args
        self.metrics = getattr(self.trainer.validator, 'metrics', None)

In [None]:
def prune_model(model, target_prune_rate=0.8, iterations=24, map_threshold=0.10, train_params="finetune.yaml"):
    """
    Apply structured channel pruning to the model.
    :param model: The YOLO model to prune.
    :param amount: The fraction of channels to prune.
    """
    # set training to train_v2 and load the training parameters
    model.__setattr__("train_v2", train_v2.__get__(model))
    train_params = yaml_load(check_yaml(train_params))
    # Set the model to training mode
    model.model.train()  

    # change c2f implementation to be compatible with the Graph pruner
    replace_c3k2_with_v2(model.model) # avoids shared references
    initialize_weights(model.model)

    # unfreeze all the layers, making them trainable
    for name, param in model.model.named_parameters():
        param.requires_grad = True # set all to True
    # dummy input to trace  the model's computation graph
    example_inputs = torch.randn(1, 3, train_params["imgsz"], train_params["imgsz"]).to(model.device)
    
    # Initialize metrics list to plot and logging purpose
    macs_list, nparams_list, map_list, map50_list, pruned_map_list = [], [], [], [], []
    
    # Get the initial number of FLOPs and parameters
    base_macs, base_nparams = tp.utils.count_ops_and_params(model.model, example_inputs)
    
    # Do validation before pruning model for baseline metrics
    train_params['name'] = "baseline_val"
    train_params['batch'] = 1
    validation_model = deepcopy(model)
    results = validation_model.val(**train_params, verbose=False)
    # Save the metrics
    init_map = results.box.map
    init_map50 = results.box.map50
    
    # add the initial metrics to the lists
    macs_list.append(base_macs)
    nparams_list.append(100) # as % of parameters
    map_list.append(init_map)
    map50_list.append(init_map50)
    pruned_map_list.append(init_map)
    print(f"Before Pruning: MACs={base_macs / 1e9: .5f} G, #Params={base_nparams / 1e6: .5f} M, mAP={init_map: .5f}")
    
    # prune same ratio of filter based on initial size
    prune_per_step = target_prune_rate / iterations

    # Iterate and prune based on the selected iterative steps
    print(f"\n -------------- MODEL PRUNING ----------------")
    for i in range(iterations):
        
        # Reset the loss function and unfreeze the layers
        model.model.criterion = None 
        model.model.train()
        for name, param in model.model.named_parameters():
            param.requires_grad = True

        # Filter out the layers that don't want to be pruned
        ignored_layers = []
        unwrapped_parameters = []
        for m in model.model.modules():
            if isinstance(m, (Detect,Attention,Concat,C3k2)):
                ignored_layers.append(m)
        
        example_inputs = example_inputs.to(model.device) # move dummy input to device
        
        # Initialize the pruner instance (structured pruning on channels or filters)
        pruner = tp.pruner.GroupNormPruner(
            model.model,
            example_inputs,
            importance=tp.importance.GroupMagnitudeImportance(p=2),  # L2 norm pruning
            iterative_steps=1,
            pruning_ratio=prune_per_step,
            ignored_layers=ignored_layers,
            unwrapped_parameters=unwrapped_parameters
        )
                
        # prune the model
        pruner.step()

        print(f"\n--------------- STEP {i + 1} OF {iterations} ----------------")
        print(f"\n---------- PRUNED RATIO : {1- (1 - prune_per_step)**(i+1):.3f} -----------")
        
        # Validation after the model been pruned - before fine-tuning
        train_params['name'] = f"step_{i}_pre_val" 
        train_params['batch'] = 1
        validation_model.model = deepcopy(model.model)
        metric = validation_model.val(**train_params, verbose=False)
        pruned_map = metric.box.map
        pruned_macs, pruned_nparams = tp.utils.count_ops_and_params(pruner.model, example_inputs.to(model.device))
        current_speed_up = float(macs_list[0]) / pruned_macs
        
        print(f"After pruning iter {i + 1}: MACs={pruned_macs / 1e9} G, #Params={pruned_nparams / 1e6} M, "
            f"mAP={pruned_map}, speed up={current_speed_up},  pruned_param_ratio={pruned_nparams / base_nparams * 100:.3f} %")
        
        # fine-tuning the pruned model
        model.model.train()
        for name, param in model.model.named_parameters():
            param.requires_grad = True
        train_params['name'] = f"step_{i}_finetune"
        train_params['batch'] = -1
        model.train_v2(**train_params)
        
        # post fine-tuning validation
        train_params['name'] = f"step_{i}_post_val"
        train_params['batch'] = 1
        validation_model = YOLO(model.trainer.best)
        metric = validation_model.val(**train_params,verbose=False)
        current_map = metric.box.map
        current_map50 = metric.box.map50
        print(f"\nAfter fine tuning mAP={current_map} - mAP50={current_map50} - pruned_param_ratio={pruned_nparams / base_nparams * 100:.3f}% \n")
        
        macs_list.append(pruned_macs)
        nparams_list.append(pruned_nparams / base_nparams * 100)
        pruned_map_list.append(pruned_map)
        map_list.append(current_map)
        map50_list.append(current_map50)
        
        # remove pruner after single iteration
        del pruner
        
        save_pruning_performance_graph(nparams_list, map_list, map50_list, macs_list, pruned_map_list, "L2 norm")
        
        # Stop if the metrics drop is greater than the max_map_drop parameter
        if init_map - current_map > map_threshold:
            print("Pruning early stop")
            break

    # --- End Iterative Pruning Loop ---
    # Combine all metrics into a dictionary
    metrics = {
        "macs_list": macs_list,
        "nparams_list": nparams_list,
        "pruned_map_list": pruned_map_list,
        "map_list": map_list,
        "map50_list": map50_list
    }
    with open("pruning_metrics.json", 'w') as f:
        json.dump(metrics, f)
    
    # Export final model
    exported = model.export(format="onnx")
    print(f"Final pruned model exported to ONNX, {exported}.")
    

In [14]:
model = YOLO("models\\trained_yolov11n.pt")
logging.getLogger('ultralytics').setLevel(logging.WARNING)

prune_model(model, target_prune_rate=0.8, iterations=5, map_threshold=0.10, train_params="finetune.yaml")

[34m[1mval: [0mScanning C:\Users\yurim\Documents\University\UM\Year_3\Bachelor_Thesis\Weed_Detection_eSys\datasets\yolo_CropOrWeed2\labels\val.cache... 1541 images, 0 backgrounds, 0 corrupt: 100%|██████████| 1541/1541 [00:00<?, ?it/s]


RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


# non graph based

In [None]:
import torch
from torch import nn
import torch.nn.utils.prune as prune
import torch.nn.functional as F
from ultralytics import YOLO

model = YOLO("models\\trained_yolov11n.pt")

torch_model = model.model
print(torch_model)

DetectionModel(
  (model): Sequential(
    (0): Conv(
      (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
      (act): SiLU(inplace=True)
    )
    (1): Conv(
      (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
      (act): SiLU(inplace=True)
    )
    (2): C3k2(
      (cv1): Conv(
        (conv): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (cv2): Conv(
        (conv): Conv2d(48, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
   

In [None]:
import torch
from torch import nn
import torch.nn.utils.prune as prune
import torch.nn.functional as F
from ultralytics import YOLO

def prune_model(model, amount=0.1):
    for module in model.model.modules():
            # prune Conv2d layers and BatchNorm2d layers
            if isinstance(module, nn.Conv2d):
                prune.ln_structured(module, name="weight", amount=amount, n=1, dim=0) # l1 norm for convolutional layers
                prune.remove(module, "weight")  # Remove reparam so zeros become actual weights
            '''
                next_module = next(module.model.modules()) # TODO does this really get the respective layer?
                if isinstance(next_module, nn.BatchNorm2d):
                    # Prune BatchNorm γ (scaling factor)
                    prune.ln_structured(next_module, name="weight", amount=amount, n=1, dim=0)
                    prune.remove(next_module, "weight")
                    # prune BatchNorm β (bias)
                    prune.ln_structured(next_module, name="bias", amount=amount, n=1, dim=0)
                    prune.remove(next_module, "bias")
            # prune Linear layers
            elif isinstance(module, nn.Linear):
                prune.l1_unstructured(module, name="weight", amount=amount / 3)  # l1 norm unstructured pruning TODO check amount
                prune.remove(module, "weight")
            '''
    return model

def iterative_pruning(model, target_prune_rate=0.8, iterations=24, map_threshold=0.10):
    """
    Perform iterative pruning on the model.
    :param model: The YOLO model to prune.
    :param target_prune_rate: Total fraction of weights/channels to prune.
    :param iterations: Number of pruning steps.
    :param map_threshold: Minimum acceptable accuracy (mAP50-95) to stop pruning.
    :return: The pruned model.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    torch_model = model.model

    # Evaluate before pruning
    results = model.val(data="datasets\\yolo_CropOrWeed2\\data.yaml", device=device)  
    initial_map = results.box.map
    print(f"Initial mAP50-95: {initial_map:.4f}")
    print(f"Model size before pruning: {sum(p.numel() for p in torch_model.parameters()) / 1e6:.2f}M parameters, {sum(p.element_size() * p.numel() for p in torch_model.parameters()) / (1024 * 1024):.2f} MB")
    print(torch_model)

    prune_per_step = target_prune_rate / iterations

    #TODO iterate
    print("Pruning model...")
    torch_pruned_model = prune_model(torch_model, amount=0.1)  # Prune the model
    print("Model pruned.")
    print(f"Model size after pruning: {sum(p.numel() for p in torch_pruned_model.parameters()) / 1e6:.2f}M parameters, {sum(p.element_size() * p.numel() for p in torch_pruned_model.parameters()) / (1024 * 1024):.2f} MB")

    model.model = torch_pruned_model ## Update the model in the YOLO wrapper
    # retrain and evaluate
    model.train(
        data="datasets\\yolo_CropOrWeed2\\data.yaml",
        epochs=50,
        imgsz=224,
        batch=-1,
        lr0=0.001,
        lrf=0.1
    )
    results = model.val(data="datasets\\yolo_CropOrWeed2\\data.yaml", device=device)  # Evaluate after pruning
    print(f"mAP50-95: {results.box.map}") # TODO store the results in a list and plot them later
    # TODO stop when hitting accuracy threshold

    # TODO strip out the zeroed channels save and export the pruned model


    return model

In [None]:
model = YOLO("models\\trained_yolov11n.pt")
target_prune_rate = 0.8
iterative_steps = 24
map_threshold = 0.10
model = iterative_pruning(model, target_prune_rate, iterations=iterative_steps, map_threshold=map_threshold)
print("model trained, saving...")
model.save("models\\pruned_yolov11n.pt")  # Save the pruned model after training
print("Model saved.")

# Evaluate the pruned model
results = model.val(data="datasets\\yolo_CropOrWeed2\\data.yaml", device=device)  # Evaluate after pruning
print(f"mAP50-95: {results.box.map}")