In [1]:
import os
import time
import math
from pathlib import Path
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from PIL import Image
import joblib
import cv2

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
torch.manual_seed(RANDOM_STATE)


<torch._C.Generator at 0x7fe931b69dd0>

In [2]:
ROOT_DIR = "dataset/fruit360"
TEST_DIR = os.path.join(ROOT_DIR, "Test")
CKPT_ROOT = Path("artifacts/checkpoints")

SIZE = 32
COLOR_BINS = 16

class Fruit360FolderDataset(Dataset):
    def __init__(self, root_dir, transform=None, variety=False):
        self.root_dir = root_dir
        self.transform = transform
        self.variety = variety
        self.samples = []
        
        for class_name in sorted(os.listdir(root_dir)):
            class_dir = os.path.join(root_dir, class_name)
            if not os.path.isdir(class_dir):
                continue
            label = class_name if self.variety else class_name.split()[0]
            for img_name in os.listdir(class_dir):
                if img_name.lower().endswith(('.jpg', '.png')):
                    self.samples.append((os.path.join(class_dir, img_name), label))
        
        unique_labels = sorted({lbl for _, lbl in self.samples})
        self.label_to_idx = {lbl: i for i, lbl in enumerate(unique_labels)}
        self.idx_to_label = {i: lbl for i, lbl in self.label_to_idx.items()}
        
        print(f"{os.path.basename(root_dir)}: {len(self.samples)} images, {len(unique_labels)} classes")
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label_str = self.samples[idx]
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, label_str

transform_32 = T.Compose([
    T.Resize((SIZE, SIZE)),
    T.ToTensor(),
])

BATCH_SIZE = 100

test_dataset_32 = Fruit360FolderDataset(TEST_DIR, transform=transform_32, variety=False)
test_loader_32 = DataLoader(test_dataset_32, batch_size=BATCH_SIZE, shuffle=False)

print(f"Test loader 32x32: {len(test_dataset_32)} images, {len(test_loader_32)} batches")


Test: 43442 images, 79 classes
Test loader 32x32: 43442 images, 435 batches


In [3]:
def clamp_01(x):
    return torch.clamp(x, 0.0, 1.0)

def add_color_patches(x, num_patches, color, alpha_range=(0.4, 0.7), size_range=(0.05, 0.15)):
    _, H, W = x.shape
    out = x.clone()
    for _ in range(num_patches):
        s = np.random.uniform(size_range[0], size_range[1])
        patch_area = s * H * W / 4
        r = np.random.uniform(0.5, 1.5)
        patch_h = int(math.sqrt(patch_area / r))
        patch_w = int(math.sqrt(patch_area * r))
        patch_h = max(1, min(H, patch_h))
        patch_w = max(1, min(W, patch_w))
        top = np.random.randint(0, H - patch_h + 1)
        left = np.random.randint(0, W - patch_w + 1)
        bottom = top + patch_h
        right = left + patch_w
        alpha = np.random.uniform(alpha_range[0], alpha_range[1])
        patch = out[:, top:bottom, left:right]
        blended = alpha * color + (1 - alpha) * patch
        out[:, top:bottom, left:right] = blended
    return clamp_01(out)

def add_occlusion_patch(x, area_ratio=0.1, color=torch.tensor([0.5, 0.5, 0.5]).view(3,1,1), alpha=0.5):
    _, H, W = x.shape
    out = x.clone()
    patch_area = area_ratio * H * W
    r = np.random.uniform(0.5, 1.5)
    patch_h = int(math.sqrt(patch_area / r))
    patch_w = int(math.sqrt(patch_area * r))
    patch_h = max(1, min(H, patch_h))
    patch_w = max(1, min(W, patch_w))
    top = np.random.randint(0, H - patch_h + 1)
    left = np.random.randint(0, W - patch_w + 1)
    bottom = top + patch_h
    right = left + patch_w
    patch = out[:, top:bottom, left:right]
    blended = alpha * color + (1 - alpha) * patch
    out[:, top:bottom, left:right] = blended
    return clamp_01(out)

color_dirt = torch.tensor([0.3, 0.25, 0.2]).view(3,1,1)
color_bruise = torch.tensor([0.25, 0.2, 0.15]).view(3,1,1)

def noise_mild(x):
    return clamp_01(x + torch.randn_like(x) * 0.025)

def dark_mild(x):
    return clamp_01(x * 0.65)

def overexposed_mild(x):
    return clamp_01(x * 1.35)

def dirty_mild(x):
    return add_color_patches(x, num_patches=2, color=color_dirt, alpha_range=(0.5, 0.8), size_range=(0.03, 0.08))

def bruised_mild(x):
    return add_color_patches(x, num_patches=1, color=color_bruise, alpha_range=(0.4, 0.7), size_range=(0.03, 0.08))

def occlusion_small(x):
    return add_occlusion_patch(x, area_ratio=0.10, alpha=0.5)

blur_medium = T.GaussianBlur(kernel_size=5, sigma=1.0)

def scenario_A(x):
    x = blur_medium(x)
    x = noise_mild(x)
    if np.random.rand() < 0.7:
        x = dirty_mild(x)
    return x

def scenario_B(x):
    if np.random.rand() < 0.5:
        x = dark_mild(x)
    else:
        x = overexposed_mild(x)
    x = noise_mild(x)
    return x

def scenario_C(x):
    x = occlusion_small(x)
    if np.random.rand() < 0.5:
        x = bruised_mild(x)
    else:
        x = dirty_mild(x)
    return x

scenario_fns = {
    "scenario_A": scenario_A,
    "scenario_B": scenario_B,
    "scenario_C": scenario_C,
}

scenarios_mixed = {
    "clean": lambda x: x,
    "scenario_A": scenario_A,
    "scenario_B": scenario_B,
    "scenario_C": scenario_C,
}

print(list(scenarios_mixed.keys()))


['clean', 'scenario_A', 'scenario_B', 'scenario_C']


In [4]:
def color_hist_features(X_np, bins=COLOR_BINS, img_shape=(3, 32, 32)):
    n_samples = X_np.shape[0]
    feats = np.zeros((n_samples, 3 * bins), dtype=np.float32)
    bin_edges = np.linspace(0.0, 1.0, bins + 1)
    for i in range(n_samples):
        img = X_np[i].reshape(img_shape)
        img = np.transpose(img, (1, 2, 0))
        img = np.clip(img, 0.0, 1.0)
        img_hsv = (img * 255.0).astype(np.uint8)
        img_hsv = cv2.cvtColor(img_hsv, cv2.COLOR_RGB2HSV)
        h, s, v = cv2.split(img_hsv)
        hists = []
        for channel in (h, s, v):
            ch_norm = channel.astype(np.float32) / 255.0
            hist, _ = np.histogram(ch_norm.ravel(), bins=bin_edges, density=True)
            hists.append(hist)
        feats[i] = np.concatenate(hists)
    return feats

print("Color histogram feature function ready")


Color histogram feature function ready


In [5]:
# Load KNN/SVM checkpoints
ckpt_model_paths = sorted(CKPT_ROOT.rglob("model.joblib"))
if not ckpt_model_paths:
    raise FileNotFoundError(f"No checkpoints found under {CKPT_ROOT}")

def infer_model_name(run_dir: Path):
    parts = run_dir.parts
    return parts[-2] if len(parts) >= 2 else "unknown"

models = {}
for model_path in ckpt_model_paths:
    run_dir = model_path.parent
    scaler_path = run_dir / "scaler.joblib"
    model = joblib.load(model_path)
    scaler = joblib.load(scaler_path) if scaler_path.exists() else None
    model_name = infer_model_name(run_dir)
    
    if model_name not in models:
        models[model_name] = []
    
    models[model_name].append({
        "run_dir": str(run_dir),
        "model": model,
        "scaler": scaler,
    })

knn_checkpoints = models.get("knn", [])
svm_checkpoints = models.get("svm", [])
if not knn_checkpoints and not svm_checkpoints:
    raise FileNotFoundError("No KNN/SVM checkpoints found under artifacts/checkpoints")

print(f"Loaded {len(knn_checkpoints)} KNN checkpoint(s)")
print(f"Loaded {len(svm_checkpoints)} SVM checkpoint(s)")


Loaded 1 KNN checkpoint(s)
Loaded 1 SVM checkpoint(s)


In [6]:
from sklearn.metrics import classification_report

def evaluate_mixed_scenarios_checkpoint(test_loader, scenario_fns, probs, model_obj, seed=42):
    np.random.seed(seed)
    torch.manual_seed(seed)
    
    model = model_obj["model"]
    scaler = model_obj["scaler"]
    scenario_names = list(scenario_fns.keys())
    
    all_preds = []
    all_labels_idx = []
    scenario_counts = {name: 0 for name in scenario_names}
    
    start = time.time()
    
    for imgs, labels_str in test_loader:
        imgs_batch = []
        labels_idx_batch = []
        for img, lbl_str in zip(imgs, labels_str):
            r = np.random.rand()
            if r < probs[0]:
                scenario = scenario_names[0]      # 'clean'
            elif r < probs[0] + probs[1]:
                scenario = scenario_names[1]      # 'scenario_A'
            elif r < probs[0] + probs[1] + probs[2]:
                scenario = scenario_names[2]      # 'scenario_B'
            else:
                scenario = scenario_names[3]      # 'scenario_C'
            
            scenario_counts[scenario] += 1
            x = scenario_fns[scenario](img)
            imgs_batch.append(x.unsqueeze(0))
            labels_idx_batch.append(test_dataset_32.label_to_idx[lbl_str])
        
        imgs_batch = torch.cat(imgs_batch, dim=0)
        X = imgs_batch.numpy()
        feats = color_hist_features(X, bins=COLOR_BINS, img_shape=(3, SIZE, SIZE))
        if scaler is not None:
            feats = scaler.transform(feats)
        preds = model.predict(feats)
        
        all_preds.extend(preds)
        all_labels_idx.extend(labels_idx_batch)
    
    all_preds = np.array(all_preds)
    all_labels_idx = np.array(all_labels_idx)
    acc = (all_preds == all_labels_idx).mean()
    elapsed = time.time() - start
    
    return acc, elapsed, scenario_counts, all_labels_idx, all_preds

probs_distribution = [0.60, 0.15, 0.15, 0.10]

for model_name, ckpts in [("knn", knn_checkpoints), ("svm", svm_checkpoints)]:
    if not ckpts:
        continue
    for ckpt_idx, model_obj in enumerate(ckpts, 1):
        print(f"Testing {model_name.upper()} checkpoint [{ckpt_idx}/{len(ckpts)}]...")
        acc_mixed, time_mixed, counts_mixed, y_true_mixed, y_pred_mixed = evaluate_mixed_scenarios_checkpoint(
            test_loader_32,
            scenarios_mixed,
            probs_distribution,
            model_obj=model_obj,
            seed=RANDOM_STATE,
        )
        
        print(f"\nAccuracy on mixed test: {acc_mixed:.4f}")
        print(f"Time: {time_mixed:.2f}s")
        print(f"Scenario distribution (actual): {counts_mixed}")
        print(f"Checkpoint: {model_obj['run_dir']}")
        
        labels = sorted(int(i) for i in np.unique(y_true_mixed))
        target_names = [str(test_dataset_32.idx_to_label.get(i, i)) for i in labels]
        
        print("\nClassification report on mixed test (per class and averages):")
        print(classification_report(
            y_true_mixed,
            y_pred_mixed,
            labels=labels,
            target_names=target_names,
            digits=4,
            zero_division=0,
        ))


Testing KNN checkpoint [1/1]...

Accuracy on mixed test: 0.8183
Time: 25.45s
Scenario distribution (actual): {'clean': 26070, 'scenario_A': 6650, 'scenario_B': 6421, 'scenario_C': 4301}
Checkpoint: artifacts/checkpoints/fruit360/color_hist/knn/20260211-114401_efd2c1b4

Classification report on mixed test (per class and averages):
              precision    recall  f1-score   support

           0     1.0000    0.6883    0.8154        77
           1     0.8387    0.8592    0.8488      5506
           2     0.8383    0.8537    0.8459       164
           3     0.9385    0.6903    0.7955      1017
           4     0.6538    0.8434    0.7366       645
           5     0.9123    0.6753    0.7761        77
           6     0.9250    0.4933    0.6435       150
           7     1.0000    0.9172    0.9568       145
           8     0.9573    0.7100    0.8153       600
           9     1.0000    0.8182    0.9000       154
          10     0.4500    0.9375    0.6081        96
          11     0.