# **Starter Code**
- Imports
- Checking GPU availability
- Getting the Kaggle set and description of the download




In [None]:

# !pip install -q ultralytics kagglehub pyyaml opencv-python efficientnet_pytorch torch torchvision
import torch
import numpy as np
import kagglehub
import pathlib
import yaml
import shutil
from ultralytics import YOLO
import pandas as pd
import matplotlib.pyplot as plt
import time
import os
from pathlib import Path
from sklearn.metrics import confusion_matrix
import seaborn as sns
import cv2
import random



In [None]:
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():     # Check for T4 availability on Colab (DON'T CHANGE)
    print("GPU:", torch.cuda.get_device_name(0))
else:
    print("No GPU detected. Go to Runtime -> Change runtime type ->  GPU")

In [None]:

path = kagglehub.dataset_download("rupankarmajumdar/crop-pests-dataset")      # Getting the DATASET from kagglehub via CLI API (see their website - DON'T CHANGE)

local_path = pathlib.Path("/content/datasets/crop-pests")     # Saved the images to a local path to increase efficiency
local_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(path, local_path, dirs_exist_ok=True)

data_yaml_path = local_path / "data.yaml"     # YAML CONGIF (DON'T CHANGE)
data_cfg = {
    "path": str(local_path),
    "train": "train/images",
    "val":   "valid/images",
    "test":  "test/images",
    "nc": 12,
    "names":
     [
      "Ants",
      "Bees",
      "Beetles",
      "Catterpillars",     # NOTE: HAD TO CHANGE SPELLING BECAUSE DATASET IMAGES ARE SPELT LIKE THIS
      "Earthworms",
      "Earwigs",
      "Grasshoppers",
      "Moths",
      "Slugs",
      "Snails",
      "Wasps",
      "Weevils",
    ]
}

with open(data_yaml_path, "w") as f:
    yaml.safe_dump(data_cfg, f)


# CHECKS
num_train_images = len(list((local_path / "train" / "images").glob("*.jpg")))
num_val_images = len(list((local_path / "valid" / "images").glob("*.jpg")))
num_test_images = len(list((local_path / "test" / "images").glob("*.jpg")))

num_train_labels = len(list((local_path / "train" / "labels").glob("*.txt")))
num_val_labels = len(list((local_path / "valid" / "labels").glob("*.txt")))
num_test_labels = len(list((local_path / "test" / "labels").glob("*.txt")))

print(f"Number of training images: {num_train_images}")
print(f"Number of validation images: {num_val_images}")
print(f"Number of test images: {num_test_images}")
print(f"Number of training labels: {num_train_labels}")
print(f"Number of validation labels: {num_val_labels}")
print(f"Number of test labels: {num_test_labels}")


# **Our Evaluation Metrics**
- mAP@0.5 >= 0.5
- mAP@0.5:0.95 (elps you detect over/under-sized bounding boxes)
- Precision (false positive control)
- Recall (false negative control)
- F1 score
- Training and Testing times



In [None]:
def base_evaluation_metrics(val_results):
    metrics = val_results.results_dict
    precision = metrics.get('metrics/precision(B)', 0)
    recall    = metrics.get('metrics/recall(B)', 0)
    mAP50     = metrics.get('metrics/mAP50(B)', 0)
    mAP50_95  = metrics.get('metrics/mAP50-95(B)', 0)
    f1_score  = (2 * precision * recall) / (precision + recall) if (precision + recall) else 0
    print(f"\nMean Average Precision (mAP@0.5):        {mAP50:.4f}")
    print(f"Mean Average Precision (mAP@0.5:0.95):   {mAP50_95:.4f}")
    print(f"Precision: {precision:.4f}  |  Recall: {recall:.4f}  |  F1-score: {f1_score:.4f}")

In [None]:
def format_time(seconds):
    mins, secs = divmod(seconds, 60)      # Converts total seconds into minutes and format.
    return f"{int(mins):0d}m {secs:.2f}s"


# DEMO: 15 EPOCH OPTIMSIED ON IMAGE DUPS AND AUGMENTATION

In [None]:
local_path = Path("/content/datasets/crop-pests")     # Ensure local_path and names are defined
data_yaml = yaml.safe_load(open(local_path / "data.yaml"))
names = data_yaml["names"]

minority_classes = ["Slugs", "Earthworms", "Beetles", "Catterpillars", "Earwigs"]

MAX_DUPLICATES = 1000
SP_AMOUNT = 0.05
SP_SALT_VS_PEPPER = 0.5


def add_salt_and_pepper_noise(img, amount=0.05, salt_vs_pepper=0.5):
    noisy = img.copy()
    h, w = noisy.shape[:2]
    num_pixels = h * w
    num_salt = int(amount * num_pixels * salt_vs_pepper)
    num_pepper = int(amount * num_pixels * (1 - salt_vs_pepper))

    # Salt (white) noise
    ys = np.random.randint(0, h, num_salt)
    xs = np.random.randint(0, w, num_salt)
    noisy[ys, xs] = 255

    # Pepper (black) noise
    ys = np.random.randint(0, h, num_pepper)
    xs = np.random.randint(0, w, num_pepper)
    noisy[ys, xs] = 0

    return noisy


minority_class_ids = [names.index(c) for c in minority_classes if c in names]       # Get the class IDs for minority classes

duplicated_count = 0      # Make sure this exists even if no minority classes

if not minority_class_ids:
    print("No valid minority classes specified from the dataset names.")
else:
    print(f"Minority classes for duplication: {minority_classes}")

    train_images_dir = local_path / "train" / "images"
    train_labels_dir = local_path / "train" / "labels"

    # Create directories for duplicated images and labels
    duplicated_images_dir = local_path / "train" / "images_duplicated"
    duplicated_labels_dir = local_path / "train" / "labels_duplicated"

    # Clean up previous runs if any
    if duplicated_images_dir.exists():
        shutil.rmtree(duplicated_images_dir)
    if duplicated_labels_dir.exists():
        shutil.rmtree(duplicated_labels_dir)

    duplicated_images_dir.mkdir(parents=True, exist_ok=True)
    duplicated_labels_dir.mkdir(parents=True, exist_ok=True)

    processed_images_count = 0

    for label_file in train_labels_dir.glob("*.txt"):
        if duplicated_count >= MAX_DUPLICATES:      # Stop once we hit max duplicates
            break

        image_file = train_images_dir / f"{label_file.stem}.jpg"
        if not image_file.exists():
            continue

        has_minority_class = False
        try:
            with open(label_file, 'r') as f:
                for line in f:
                    parts = line.strip().split()
                    if parts:
                        class_id = int(parts[0])
                        if class_id in minority_class_ids:
                            has_minority_class = True
                            break     # Found a minority class
        except Exception as e:
            print(f"Error reading label file {label_file}: {e}")
            continue      # Skip this file if there's an error

        if has_minority_class:
            processed_images_count += 1

            new_image_name = f"{image_file.stem}_dup{1}{image_file.suffix}"
            new_label_name = f"{label_file.stem}_dup{1}{label_file.suffix}"

            new_image_path = duplicated_images_dir / new_image_name
            new_label_path = duplicated_labels_dir / new_label_name

            try:
                img = cv2.imread(str(image_file))
                if img is None:
                    print(f"Warning: Could not read image {image_file} for augmentation.")
                    continue

                noisy_img = add_salt_and_pepper_noise(       # 1) Add salt-and-pepper noise
                    img,
                    amount=SP_AMOUNT,
                    salt_vs_pepper=SP_SALT_VS_PEPPER
                )

                rotated_img = cv2.rotate(noisy_img, cv2.ROTATE_180)     # 2) Rotate 180 degrees


                cv2.imwrite(str(new_image_path), rotated_img)
                shutil.copy2(str(label_file), str(new_label_path))

                duplicated_count += 1

            except Exception as e:
                print(f"Error duplicating files for {image_file}: {e}")
# CHECK
print(f"Total Duplicates: {duplicated_count}")




In [None]:
# Generate Training File List Including Duplicated Images (No Copying)

original_train_images_dir = local_path / "train" / "images"
# This is for duplicates
duplicated_images_dir = local_path / "train" / "images_duplicated"

train_file_list_path = local_path / "train_images_list.txt"

all_train_image_paths = []

if original_train_images_dir.exists():
    print(f"Collecting images from: {original_train_images_dir}")
    all_train_image_paths.extend([str(p.resolve()) for p in original_train_images_dir.glob("*.jpg")])     # Collect original image paths
else:
    print(f"Warning: Original training images directory not found at {original_train_images_dir}")

if duplicated_images_dir.exists():
    print(f"Collecting images from: {duplicated_images_dir}")
    all_train_image_paths.extend([str(p.resolve()) for p in duplicated_images_dir.glob("*.jpg")])
else:
    print(f"Warning: Duplicated images directory not found at {duplicated_images_dir}")     # Collect duplicated image paths

with open(train_file_list_path, "w") as f:
    for img_path in all_train_image_paths:      # Write the file list to a text file
        f.write(f"{img_path}\n")

print(f"\nGenerated training image file list at: {train_file_list_path}")
print(f"Total images included in the file list: {len(all_train_image_paths)}")

In [None]:
# Update data.yaml to Use the Training File List

data_yaml_path = local_path / "data.yaml"

if data_yaml_path.exists():
    with open(data_yaml_path, "r") as f:      # Load the existing data.yaml content
        data_cfg = yaml.safe_load(f)
else:
    print(f"Warning: data.yaml not found at {data_yaml_path}. Creating a new one.")
    data_cfg = {
        "path": str(local_path), # Keep the root path for val/test
        "val":   "valid/images",
        "test":  "test/images",
        "nc": 12,
        "names":
         [
          "Ants",
          "Bees",
          "Beetles",
          "Catterpillars",
          "Earthworms",
          "Earwigs",
          "Grasshoppers",
          "Moths",
          "Slugs",
          "Snails",
          "Wasps",
          "Weevils",
        ]
    }

data_cfg["train"] = str(train_file_list_path.resolve())

with open(data_yaml_path, "w") as f:      # Write the updated data.yaml file
    yaml.safe_dump(data_cfg, f)


print(yaml.safe_dump(data_cfg))


In [None]:
 # OPTIMISED 15 EPOCH: CHOSEN
model = YOLO("yolov8n.pt")
start = time.time()
train_res = model.train(
    data=str(local_path/"data.yaml"),     # Use the data.yaml file which now points to the training image file list
    epochs=15,
    imgsz=512,
    mosaic=1.0,
    batch=-1,
    device=0,
    project="pests_fast",
    name="yolov8n_colab",
    verbose=True
)

end = time.time()
train_time = end - start

start = time.time()
val_results = model.val(
    data=str(local_path/"data.yaml"),
    split="test",
    batch=16,
    iou=0.5,
    device=0,
    workers=2
)

end = time.time()
val_time = end - start


In [None]:
sample_dir = local_path / "test" / "images"
test_res = model.predict(
    source=str(sample_dir),
    imgsz=512,
    conf=0.25,
    device=0,
    save=True,
    project="runs/detect",
    name="pest_predictions"
)

In [None]:
IMGSZ = 512
UPSCALE = 6
BOX_SCALE = 6
TITLE_FONTSIZE = 22
TEXT_FONTSIZE = 16

# Paths
test_img_dir = local_path / "test" / "images"
test_lbl_dir = local_path / "test" / "labels"

# Class names
with open(local_path / "data.yaml") as f:
    data_cfg = yaml.safe_load(f)
class_names = data_cfg["names"]


def draw_boxes(img, boxes, color, scale=BOX_SCALE):
    thickness = 2 * scale
    for box in boxes:
        x1, y1, x2, y2 = map(int, box)
        cv2.rectangle(img, (x1, y1), (x2, y2), color, thickness)


def load_gt_boxes(label_path, image_weight, image_height):
    gt_boxes = []
    gt_classes = []
    if not label_path.exists():
        return gt_boxes, gt_classes

    with open(label_path) as f:
        for line in f:
            cid, xc, yc, bw, bh = map(float, line.split())
            x1 = (xc - bw/2) * image_weight
            y1 = (yc - bh/2) * image_height
            x2 = (xc + bw/2) * image_weight
            y2 = (yc + bh/2) * image_height
            gt_boxes.append([x1, y1, x2, y2])
            gt_classes.append(int(cid))
    return gt_boxes, gt_classes

test_res = model.predict(
    source=str(test_img_dir),
    imgsz=IMGSZ,
    conf=0.25,
    device=0,
    save=False,
    verbose=False
)
pred_map = { Path(r.path).name: r for r in test_res }

# Show 5 per class picked randomly
for cls_id, cls_name in enumerate(class_names):
    # Find test images with this class
    candidate_images = []
    for lbl_file in test_lbl_dir.glob("*.txt"):
        with open(lbl_file) as f:
            if any(int(line.split()[0]) == cls_id for line in f):
                candidate_images.append(lbl_file.stem + ".jpg")

    if not candidate_images:
        print(f"No images for class {cls_name}")
        continue

    sample_imgs = random.sample(candidate_images, k=min(5, len(candidate_images)))

    for img_name in sample_imgs:

        # Load & upscale
        img_path = test_img_dir / img_name
        img = cv2.imread(str(img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, None, fx=UPSCALE, fy=UPSCALE, interpolation=cv2.INTER_NEAREST)
        h, w = img.shape[:2]

        # Ground Truth box in red
        label_path = test_lbl_dir / (Path(img_name).stem + ".txt")
        gt_boxes, gt_classes = load_gt_boxes(label_path, w, h)
        draw_boxes(img, np.array(gt_boxes), (255, 0, 0), scale=BOX_SCALE)

        gt_text = ", ".join([class_names[c] for c in gt_classes]) or "None"

        # Prediction box in blue
        pred = pred_map.get(img_name, None)
        pred_boxes = []
        pred_text = "None"

        if pred is not None and pred.boxes is not None:
            pred_boxes = (pred.boxes.xyxy.cpu().numpy() * UPSCALE)
            pred_classes = [int(b.cls) for b in pred.boxes]
            pred_confs = [float(b.conf) for b in pred.boxes]
            pred_text = ", ".join([
                f"{class_names[c]} ({conf:.2f})"
                for c, conf in zip(pred_classes, pred_confs)
            ])

        draw_boxes(img, pred_boxes, (0, 0, 255), scale=BOX_SCALE)
        plt.figure(figsize=(7, 7))
        plt.suptitle(f"{cls_name} â€” Example (GT=Red, Pred=Blue)", fontsize=TITLE_FONTSIZE, fontweight='bold')
        plt.imshow(img)
        plt.axis("off")
        plt.figtext(      # Classification text outside image
            0.5, -0.05,
            f"GT: {gt_text}\nPredicted: {pred_text}",
            ha="center",
            fontsize=TEXT_FONTSIZE
        )

        plt.show()
        print("-----------------------------------------------------------------------")