<a href="https://colab.research.google.com/github/aidantze/pesta-la-vista/blob/ml_models/ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ML Models
9517 Group Assignment

Constants

In [11]:
# KNN constants
RESOLUTION = 64                  # default 512, CPU limits us to no higher than 64 without crashing
K_NEIGHBORS = 5                   # default 5

# Noise and filter constants
APPLY_CORRUPTION = False
CORRUPT_TYPE = 'gaussian_noise'   # one of: 'gaussian_noise', 'salt_pepper_noise', 'gaussian_blur'
CORRUPT_STRENGTH = 0.05           # e.g., 0.05 = 5% noise or 5x5 kernel blur

Setup (Do Not Change)

In [12]:
!pip install -q kagglehub pyyaml

import numpy as np
import cv2
import kagglehub
import pathlib
import yaml
import shutil
import os
import time

from sklearn import metrics
from sklearn.preprocessing import LabelBinarizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import SGDClassifier
import torch

# Check for T4 availability on Colab (DON'T CHANGE)
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))
else:
    print("No GPU detected. Go to Runtime -> Change runtime type ->  GPU")

# Kaggle Download via CLI API (see their website - DON'T CHANGE)
path = kagglehub.dataset_download("rupankarmajumdar/crop-pests-dataset")

# Saved the images to a local path to increase efficiency
local_path = pathlib.Path("/content/datasets/crop-pests")
local_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(path, local_path, dirs_exist_ok=True)

# YAML CONGIF (DON'T CHANGE)
data_yaml_path = local_path / "data.yaml"
data_cfg = {
    "path": str(local_path),
    "train": "train/images",
    "val":   "valid/images",
    "test":  "test/images",
    "names": [
        "ant", "bee", "beetle", "caterpillar", "earthworm", "earwig",
        "grasshopper", "moth", "slug", "snail", "wasp", "weevil"
    ]
}

with open(data_yaml_path, "w") as f:
    yaml.safe_dump(data_cfg, f)


[31mERROR: Operation cancelled by user[0m[31m
[0mCUDA available: False
No GPU detected. Go to Runtime -> Change runtime type ->  GPU


In [4]:
# Data Loading Function for KNN
CLASS_NAMES = data_cfg["names"]
CLASS_MAP = {name: i for i, name in enumerate(CLASS_NAMES)}

def load_split_data(base_path, split_name, class_map, img_size):
    """
    Loads images and extracts the majority class label from the YOLO-style
    label files for KNN classification.
    """
    X_list = []
    y_list = []

    split_dir = base_path / split_name
    image_dir = split_dir / "images"
    label_dir = split_dir / "labels"

    print(f"\nProcessing images in: {image_dir}")

    if not image_dir.exists() or not label_dir.exists():
        print(f"Error: Could not find 'images' or 'labels' in {split_dir}")
        return np.array([]), np.array([])

    for img_file in image_dir.glob("*.jpg"):

        # 1. Determine the corresponding label file name (.txt)
        label_file = label_dir / (img_file.stem + ".txt")

        # 2. Check and process label file
        if not label_file.exists():
            # print(f"Warning: Missing label file for {img_file.name}") # Too verbose
            continue

        try:
            # Read all lines from the label file
            with open(label_file, 'r') as f:
                lines = f.readlines()

            if not lines:
                continue # Skip if label file is empty

            # Extract all class IDs (the first number in each row)
            class_ids = [int(line.strip().split()[0]) for line in lines]

            # Find the majority class ID (the most frequent insect)
            # bincount is efficient for finding frequencies of non-negative integers
            class_counts = np.bincount(class_ids)
            majority_class_id = np.argmax(class_counts)

        except Exception as e:
            print(f"Error reading label {label_file.name}: {e}")
            continue

        # 3. Load and Preprocess Image
        # Load as color image (3 channels) for consistent feature count
        img = cv2.imread(str(img_file), cv2.IMREAD_COLOR)

        if img is not None:
            # Resize, flatten, and normalize
            img = cv2.resize(img, (img_size, img_size))
            img_normalized = img.astype('float32') / 255.0

            X_list.append(img_normalized.flatten())
            y_list.append(majority_class_id) # Use the derived majority class

    # 4. Final array creation (handles the 2D shape requirement)
    X = np.array(X_list, dtype='float32')
    y = np.array(y_list)

    print(f"Loaded {len(X)} samples for {split_name}. Final shape: {X.shape}")
    return X, y


Noise and Filter Analysis

In [5]:
def apply_corruption_to_folder(source_dir, destination_dir, corruption_type, strength=0.01):
    """
    Copies images from source to destination and applies a specified corruption.
    (Function body is omitted here for brevity, assuming the user's provided code is used)
    """
    if destination_dir.exists():
        shutil.rmtree(destination_dir)
    shutil.copytree(source_dir, destination_dir)

    if corruption_type == 'gaussian_noise':
        sigma = int(strength * 255)
        print(f"\nApplying {corruption_type} (Sigma: {sigma}) to images in {destination_dir.name}...")
        for img_file in destination_dir.glob('*.jpg'):
            img = cv2.imread(str(img_file))
            if img is None: continue
            noise = np.random.normal(0, sigma, img.shape).astype('uint8')
            corrupted_img = cv2.add(img, noise)
            cv2.imwrite(str(img_file), corrupted_img)

    elif corruption_type == 'salt_pepper_noise':
        ratio = strength
        print(f"\nApplying {corruption_type} (Ratio: {ratio}) to images in {destination_dir.name}...")
        for img_file in destination_dir.glob('*.jpg'):
            img = cv2.imread(str(img_file))
            if img is None: continue
            corrupted_img = img.copy()
            total_pixels = img.size

            # TODO: verify the logic of this part, it seems the img coords is always pepper (0)
            # num_salt_pepper = int(ratio * total_pixels / img.shape[2])
            # coords = [np.random.randint(0, i - 1, num_salt_pepper) for i in img.shape]
            # corrupted_img[coords[0], coords[1], coords[2]] = 255
            # coords = [np.random.randint(0, i - 1, num_salt_pepper) for i in img.shape]
            # corrupted_img[coords[0], coords[1], coords[2]] = 0
            # cv2.imwrite(str(img_file), corrupted_img)


            num_corrupt_elements = int(ratio * total_pixels)
            flat_indices = np.random.choice(
                total_pixels,
                size=num_corrupt_elements,
                replace=False
            )
            num_salt = num_pepper = num_corrupt_elements // 2

            # Apply Salt (White)
            salt_indices = flat_indices[:num_salt]
            corrupted_img.flat[salt_indices] = 255

            # Apply Pepper (Black)
            pepper_indices = flat_indices[num_salt:num_salt + num_pepper]
            corrupted_img.flat[pepper_indices] = 0

    elif corruption_type == 'gaussian_blur':
        ksize = int(strength * 100) if int(strength * 100) % 2 != 0 else int(strength * 100) + 1
        print(f"\nApplying {corruption_type} (ksize: {ksize}) to images in {destination_dir.name}...")
        for img_file in destination_dir.glob('*.jpg'):
            img = cv2.imread(str(img_file))
            if img is None: continue
            corrupted_img = cv2.GaussianBlur(img, (ksize, ksize), 0)
            cv2.imwrite(str(img_file), corrupted_img)

    print("Corruption application complete.")


VAL_DIR_NAME = "valid"
if APPLY_CORRUPTION:
    original_val_path = local_path / VAL_DIR_NAME
    corrupt_val_path_name = f"{VAL_DIR_NAME}_corrupted"
    corrupt_val_path = local_path / corrupt_val_path_name

    # Create the corrupted validation set by copying and modifying the 'valid' split
    apply_corruption_to_folder(
        original_val_path,
        corrupt_val_path,
        CORRUPT_TYPE,
        CORRUPT_STRENGTH
    )
    VAL_DIR_NAME = corrupt_val_path_name # Switch the validation directory name

Load Data Splits

In [6]:
# --- 4. Load Data Splits ---
X_train, y_train = load_split_data(local_path, "train", CLASS_MAP, RESOLUTION)

# Load validation data (clean or corrupted)
X_val, y_val = load_split_data(local_path, VAL_DIR_NAME, CLASS_MAP, RESOLUTION)

# Load test data
X_test, y_test = load_split_data(local_path, "test", CLASS_MAP, RESOLUTION)


Processing images in: /content/datasets/crop-pests/train/images
Loaded 11499 samples for train. Final shape: (11499, 12288)

Processing images in: /content/datasets/crop-pests/valid/images
Loaded 1095 samples for valid. Final shape: (1095, 12288)

Processing images in: /content/datasets/crop-pests/test/images
Loaded 546 samples for test. Final shape: (546, 12288)


In [9]:
# format time for metrics in next stage
def format_time(seconds):
    """Converts total seconds into minutes and seconds format."""
    # Ensure all time variables (train_time, val_time, test_time) are available
    mins, secs = divmod(seconds, 60)
    return f"{int(mins):0d}m {secs:.2f}s"

### KNN Classifier — Train model, prediction and evaluation

In [19]:
# Training the Model
knn_classifier = KNeighborsClassifier(n_neighbors=K_NEIGHBORS)

start = time.time()
knn_classifier.fit(X_train, y_train)
end = time.time()
knn_train_time = end - start

start_kernel = time.time()
knn_val_pred = knn_classifier.predict(X_val)
end = time.time()
knn_val_time = end - start

knn_val_probas = knn_classifier.predict_proba(X_val)

start = time.time()
knn_test_pred = knn_classifier.predict(X_test)
end = time.time()
knn_test_time = end - start


In [20]:
# evaluation metrics
# We use the 'weighted' average suitable for multi-class, potentially imbalanced data.
accuracy = metrics.accuracy_score(y_val, knn_val_pred)
precision, recall, f1_score, _ = metrics.precision_recall_fscore_support(
    y_val, knn_val_pred, average='weighted', zero_division=0
)

# AUC is calculated in a One-vs-Rest fashion for multi-class problems.
# The true labels (y_val) must be binarized first.
lb = LabelBinarizer()
y_val_binarized = lb.fit_transform(y_val)

# We use the 'weighted' average to account for class support
auc_score = metrics.roc_auc_score(
    y_val_binarized,
    knn_val_probas,
    average='weighted',
    multi_class='ovr'
)

# --- 4. Output Refactored Metrics ---
print(f"Results for KNN (K={K_NEIGHBORS}) with image size {RESOLUTION} and {f"{CORRUPT_TYPE}, strength set to {CORRUPT_STRENGTH}" if APPLY_CORRUPTION else "no filters"}:\n")

# NOTE: mAP is not available for KNN (Classification)
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision (Weighted): {precision:.4f}")
print(f"Recall (Weighted): {recall:.4f}")
print(f"F1-Score (Weighted): {f1_score:.4f}")
print(f"Area Under the Curve (AUC): {auc_score:.4f}")

print(f"\nTraining Time (Total): {format_time(knn_train_time)}")
print(f"Validation Time (Prediction): {format_time(knn_val_time)}")

Results for KNN (K=5) with image size 64 and no filters:

Accuracy: 0.1653
Precision (Weighted): 0.1874
Recall (Weighted): 0.1653
F1-Score (Weighted): 0.1505
Area Under the Curve (AUC): 0.5918

Training Time (Total): 0m 0.46s
Validation Time (Prediction): 0m 15.47s


### Decision Tree Classifier — Train model, prediction and evaluation

In [24]:
dt_classifier = DecisionTreeClassifier()

start = time.time()
dt_classifier.fit(X_train, y_train)
end = time.time()
dt_train_time = end - start

start_kernel = time.time()
dt_val_pred = dt_classifier.predict(X_val)
end = time.time()
dt_val_time = end - start

dt_val_probas = dt_classifier.predict_proba(X_val)

start = time.time()
dt_test_pred = dt_classifier.predict(X_test)
end = time.time()
dt_test_time = end - start

In [25]:
# evaluation metrics
# We use the 'weighted' average suitable for multi-class, potentially imbalanced data.
accuracy = metrics.accuracy_score(y_val, dt_val_pred)
precision, recall, f1_score, _ = metrics.precision_recall_fscore_support(
    y_val, dt_val_pred, average='weighted', zero_division=0
)

# AUC is calculated in a One-vs-Rest fashion for multi-class problems.
# The true labels (y_val) must be binarized first.
lb = LabelBinarizer()
y_val_binarized = lb.fit_transform(y_val)

# We use the 'weighted' average to account for class support
auc_score = metrics.roc_auc_score(
    y_val_binarized,
    dt_val_probas,
    average='weighted',
    multi_class='ovr'
)


# --- 4. Output Refactored Metrics ---
print(f"Results for DT with image size {RESOLUTION} and {f"{CORRUPT_TYPE}, strength set to {CORRUPT_STRENGTH}" if APPLY_CORRUPTION else "no filters"}:\n")

# NOTE: mAP is not available for KNN (Classification)
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision (Weighted): {precision:.4f}")
print(f"Recall (Weighted): {recall:.4f}")
print(f"F1-Score (Weighted): {f1_score:.4f}")
print(f"Area Under the Curve (AUC): {auc_score:.4f}")

print(f"\nTraining Time (Total): {format_time(dt_train_time)}")
print(f"Validation Time (Prediction): {format_time(dt_val_time)}")

Results for DT with image size 64 and no filters:

Accuracy: 0.1260
Precision (Weighted): 0.1252
Recall (Weighted): 0.1260
F1-Score (Weighted): 0.1252
Area Under the Curve (AUC): 0.5226

Training Time (Total): 4m 56.24s
Validation Time (Prediction): 4m 56.24s


### SGD Classifier — Train model, prediction and evaluation

In [7]:
sgd_classifier = SGDClassifier(loss='log_loss', max_iter=250, random_state=42)

start = time.time()
sgd_classifier.fit(X_train, y_train)
end = time.time()
sgd_train_time = end - start

start = time.time()
sgd_val_pred = sgd_classifier.predict(X_val)
end = time.time()
sgd_val_time = end - start

sgd_val_probas = sgd_classifier.predict_proba(X_val)

start = time.time()
sgd_test_pred = sgd_classifier.predict(X_test)
end = time.time()
sgd_test_time = end - start

In [10]:
# evaluation metrics
# We use the 'weighted' average suitable for multi-class, potentially imbalanced data.
accuracy = metrics.accuracy_score(y_val, sgd_val_pred)
precision, recall, f1_score, _ = metrics.precision_recall_fscore_support(
    y_val, sgd_val_pred, average='weighted', zero_division=0
)

# AUC is calculated in a One-vs-Rest fashion for multi-class problems.
# The true labels (y_val) must be binarized first.
lb = LabelBinarizer()
y_val_binarized = lb.fit_transform(y_val)

# We use the 'weighted' average to account for class support
auc_score = metrics.roc_auc_score(
    y_val_binarized,
    sgd_val_probas,
    average='weighted',
    multi_class='ovr'
)


# --- 4. Output Refactored Metrics ---
print(f"Results for SGD with image size {RESOLUTION} and {f"{CORRUPT_TYPE}, strength set to {CORRUPT_STRENGTH}" if APPLY_CORRUPTION else "no filters"}:\n")

# NOTE: mAP is not available for KNN (Classification)
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision (Weighted): {precision:.4f}")
print(f"Recall (Weighted): {recall:.4f}")
print(f"F1-Score (Weighted): {f1_score:.4f}")
print(f"Area Under the Curve (AUC): {auc_score:.4f}")

print(f"\nTraining Time (Total): {format_time(sgd_train_time)}")
print(f"Validation Time (Prediction): {format_time(sgd_val_time)}")

Results for SGD with image size 64 and no filters:

Accuracy: 0.1562
Precision (Weighted): 0.1513
Recall (Weighted): 0.1562
F1-Score (Weighted): 0.1481
Area Under the Curve (AUC): 0.6069

Training Time (Total): 14m 57.31s
Validation Time (Prediction): 0m 0.02s
