### AgroPest-12 -- Detection + Classification pipeline (traditional ML)

This object-oriented Python module implements:
- Feature extractors: HOG, SIFT (Bag-of-visual-words), LBP
- Classifier wrappers: SVM, RandomForest, KNN
- Detector: sliding-window multi-scale detector + NMS
- Training pipeline: crop annotated boxes to build training samples
- Test evaluation: compute per-class AP and mAP based on IoU
- Model persistence: saves 9 classifier models + SIFT codebook

#### Requirements
------------
python >=3.8
pip install opencv-contrib-python scikit-image scikit-learn joblib tqdm numpy

#### Notes
-----
- SIFT requires opencv-contrib (cv2.SIFT_create).
- This module assumes dataset is organized as:
  dataset/
------ 
    train/
      images/
      labels/   # YOLO-like txt or custom CSV. See README below
    valid/
      images/
      labels/
    test/
      images/
      labels/

Label format used by the pipeline: CSV per dataset split with fields:
image_path,x_min,y_min,x_max,y_max,class_id
(one row per bounding box). The pipeline will also create its own crops.

This is a working, self-contained implementation. For large datasets, SIFT codebook kmeans
training can be slow; you may want to subsample descriptors.

In [24]:
# ================================================
# AgroPest-12 — Detection & Classification Framework
# 作者: weichen wang
# 日期: 2025
# 支持 YOLO 格式标签 | 自动选最佳模型 | 完整指标报告
# ================================================
import os
import cv2
import numpy as np
from skimage.feature import hog, local_binary_pattern
from sklearn.cluster import MiniBatchKMeans
from sklearn.svm import SVC
from sklearn.svm import LinearSVC
from sklearn.calibration import CalibratedClassifierCV
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import precision_recall_curve

from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, roc_auc_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict
import pandas as pd

from joblib import dump, load
from tqdm import tqdm
import glob
import yaml
import math
import json
import time

####  Utility functions 

In [25]:
def ensure_dir(path):
    os.makedirs(path, exist_ok=True)

def iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interW = max(0, xB - xA + 1)
    interH = max(0, yB - yA + 1)
    interArea = interW * interH
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    denom = boxAArea + boxBArea - interArea
    return interArea / denom if denom > 0 else 0.0

def non_max_suppression(boxes, scores, iou_thresh=0.4):
    if len(boxes) == 0:
        return []
    idxs = np.argsort(scores)[::-1]
    keep = []
    while len(idxs) > 0:
        i = idxs[0]
        keep.append(i)
        if len(idxs) == 1:
            break
        rest = idxs[1:]
        iou_scores = [iou(boxes[i], boxes[j]) for j in rest]
        idxs = np.array(rest)[np.array(iou_scores) <= iou_thresh]
    return keep

#### Feature extractors

In [26]:
class FeatureExtractor:
    def __init__(self, size=(128, 128), mode='hog'):
        self.size = size
        self.mode = mode  # 'hog' or 'lbp'

    def extract_hog(self, img):
        if len(img.shape) == 3:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = cv2.resize(img, self.size)
        feat = hog(img, pixels_per_cell=(16, 16), cells_per_block=(2, 2), feature_vector=True)
        return feat

    def extract_lbp(self, img, P=8, R=1):
        if len(img.shape) == 3:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = cv2.resize(img, self.size)
        lbp = local_binary_pattern(img, P, R, method='uniform')
        n_bins = int(lbp.max() + 1)
        hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins), density=True)
        return hist

class SIFTBoWExtractor:
    def __init__(self, size=(128, 128), k=64):
        self.size = size
        self.k = k
        self.codebook = None
        self.sift = cv2.SIFT_create()

    def compute_descriptors(self, img):
        if len(img.shape) == 3:
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        else:
            gray = img
        gray = cv2.resize(gray, self.size)
        kp, des = self.sift.detectAndCompute(gray, None)
        return des

    def fit_codebook(self, descriptor_list, k=None, batch_size=2048, max_iter=300):
        if k is None:
            k = self.k
        all_desc = np.vstack(descriptor_list)
        n_samples = min(100000, len(all_desc))  # 上限10万，防止OOM
        idxs = np.random.choice(len(all_desc), n_samples, replace=False)
        subs = all_desc[idxs]
        kmeans = MiniBatchKMeans(n_clusters=k, batch_size=batch_size, max_iter=max_iter, random_state=42)
        kmeans.fit(subs)
        self.codebook = kmeans
        return kmeans

    def extract_bow(self, img):
        des = self.compute_descriptors(img)
        if des is None or self.codebook is None or len(des) == 0:
            return np.zeros(self.k)
        labels = self.codebook.predict(des)
        hist, _ = np.histogram(labels, bins=np.arange(self.k + 1), density=True)
        return hist

####  Classifier wrappers 

In [27]:
class ClassifierWrapper:
    def __init__(self, model_type='svm'):
        self.model_type = model_type
        self.model = None
        self.need_pca = False  # 只有 HOG 需要 PCA

    def build(self, X_sample=None):  # 传入几行样本用来计算 PCA
        if self.model_type == 'svm':
            base_svc = LinearSVC(C=1.0, class_weight='balanced', max_iter=10000, dual=False)
            self.model = CalibratedClassifierCV(base_svc, method='sigmoid', cv=3, n_jobs=-1)

        elif self.model_type == 'rf':
            self.model = RandomForestClassifier(
                n_estimators=300, n_jobs=-1, class_weight='balanced',
                max_depth=20, min_samples_leaf=2, random_state=42
            )

        elif self.model_type == 'knn':
            self.model = KNeighborsClassifier(n_neighbors=7, n_jobs=-1, weights='distance')

        # 只有 HOG 才加 PCA
        if X_sample is not None and self.model_type != 'knn':  # KNN 不怕高维
            if X_sample.shape[1] > 300:  # 只有 HOG > 3000 维
                print(f"Applying PCA for high-dim feature ({X_sample.shape[1]} → ~150 dims)")
                pca = PCA(n_components=0.99, svd_solver='full', whiten=True, random_state=42)
                self.model = Pipeline([('pca', pca), ('clf', self.model)])
                self.need_pca = True

    def fit(self, X, y):
        if self.model is None:
            self.build(X_sample=X)
        print(f"Training {self.model_type.upper()} on {X.shape[0]} samples, {X.shape[1]} dims...", end='')
        self.model.fit(X, y)
        print("Done")

    def predict_proba(self, X):
        return self.model.predict_proba(X)

    def save(self, path):
        dump(self.model, path)

    def load(self, path):
        self.model = load(path)

####  Detectors

In [28]:
class SlidingWindowDetector:
    def __init__(self, fe, clf, window_size=(128, 128), step=24, scales=[0.5, 0.75, 1.0, 1.25, 1.5]):
        self.fe = fe
        self.clf = clf
        self.window_size = window_size
        self.step = step
        self.scales = scales

    def _extract(self, crop):
        if isinstance(self.fe, SIFTBoWExtractor):
            return self.fe.extract_bow(crop)
        elif self.fe.mode == 'hog':
            return self.fe.extract_hog(crop)
        elif self.fe.mode == 'lbp':
            return self.fe.extract_lbp(crop)

    def detect(self, image, score_thresh=0.65, nms_iou=0.4):
        boxes, scores, labels = [], [], []
        H, W = image.shape[:2]

        for scale in self.scales:
            resized = cv2.resize(image, (int(W * scale), int(H * scale)))
            h, w = resized.shape[:2]
            ws, hs = self.window_size

            for y in range(0, h - hs + 1, self.step):
                for x in range(0, w - ws + 1, self.step):
                    crop = resized[y:y+hs, x:x+ws]
                    feat = self._extract(crop)
                    if feat is None:
                        continue
                    prob = self.clf.predict_proba(feat.reshape(1, -1))[0]
                    score = prob.max()
                    if score >= score_thresh:
                        # 逆投影到原图坐标
                        orig_x = int(x / scale)
                        orig_y = int(y / scale)
                        orig_w = int(ws / scale)
                        orig_h = int(hs / scale)
                        boxes.append([orig_x, orig_y, orig_x + orig_w - 1, orig_y + orig_h - 1])
                        scores.append(score)
                        labels.append(int(prob.argmax()))

        if len(boxes) == 0:
            return [], [], []

        boxes = np.array(boxes)
        scores = np.array(scores)
        labels = np.array(labels)

        final_boxes, final_scores, final_labels = [], [], []
        for cls in np.unique(labels):
            idxs = np.where(labels == cls)[0]
            keep = non_max_suppression(boxes[idxs], scores[idxs], nms_iou)
            for k in keep:
                final_boxes.append(boxes[idxs[k]])
                final_scores.append(scores[idxs[k]])
                final_labels.append(cls)

        return final_boxes, final_scores, final_labels


class SelectiveSearchDetector:
    def __init__(self, fe, clf, max_proposals=1200, score_thresh=0.05, nms_iou=0.4):
        self.fe = fe
        self.clf = clf
        self.max_proposals = max_proposals      # 关键1：限制数量
        self.score_thresh = score_thresh         # 关键2：降低阈值
        self.nms_iou = nms_iou
        self.fixed_size = (224, 224)             # 关键3：固定输入大小

    def _extract_feat(self, crop):
        # 强制 resize 到 224×224（SIFT/HOG 都够用，还超快！）
        if crop.shape[0] == 0 or crop.shape[1] == 0:
            return None
        crop = cv2.resize(crop, self.fixed_size, interpolation=cv2.INTER_LINEAR)

        if isinstance(self.fe, SIFTBoWExtractor):
            return self.fe.extract_bow(crop)
        elif self.fe.mode == 'hog':
            return self.fe.extract_hog(crop)
        else:
            return self.fe.extract_lbp(crop)

    def detect(self, image):
        # 1. 极速 Selective Search
        ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation()
        ss.setBaseImage(image)
        ss.switchToSelectiveSearchFast()          # 必须是 Fast
        rects = ss.process()[:self.max_proposals] # 关键：只取前 1200 个！！！

        # 2. 批量特征提取 + 预测（向量化）
        features = []
        boxes = []
        for (x, y, w, h) in rects:
            if min(w, h) < 20:   # 太小直接扔
                continue
            crop = image[y:y+h, x:x+w]
            feat = self._extract_feat(crop)
            if feat is not None:
                features.append(feat)
                boxes.append([x, y, x+w, y+h])

        if len(features) == 0:
            return [], [], []

        features = np.array(features)
        probs = self.clf.predict_proba(features)
        scores = probs.max(axis=1)
        labels = probs.argmax(axis=1)

        # 3. 阈值过滤 + NMS
        keep = scores >= self.score_thresh
        if not keep.any():
            return [], [], []

        final_boxes = [boxes[i] for i in range(len(boxes)) if keep[i]]
        final_scores = scores[keep].tolist()
        final_labels = labels[keep].tolist()

        # NMS（用 OpenCV 的超快版本）
        if len(final_boxes) > 0:
            keep_idx = cv2.dnn.NMSBoxes(
                final_boxes, final_scores, score_threshold=0.0, nms_threshold=self.nms_iou
            )
            if len(keep_idx) > 0:
                keep_idx = keep_idx.flatten()
                final_boxes = [final_boxes[i] for i in keep_idx]
                final_scores = [final_scores[i] for i in keep_idx]
                final_labels = [final_labels[i] for i in keep_idx]

        return final_boxes, final_scores, final_labels

class MSERDetector:
    def __init__(self, fe, clf, min_area=500, max_area=50000, score_thresh=0.65, nms_iou=0.4):
        self.fe = fe
        self.clf = clf
        self.min_area = min_area
        self.max_area = max_area
        self.score_thresh = score_thresh
        self.nms_iou = nms_iou
        self.mser = cv2.MSER_create()

    def _extract(self, crop):
        if isinstance(self.fe, SIFTBoWExtractor):
            return self.fe.extract_bow(crop)
        elif self.fe.mode == 'hog':
            return self.fe.extract_hog(crop)
        else:
            return self.fe.extract_lbp(crop)

    def detect(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        regions, _ = self.mser.detectRegions(gray)

        boxes, scores, labels = [], [], []

        for contour in regions:
            x, y, w, h = cv2.boundingRect(contour)
            if not (self.min_area <= w * h <= self.max_area):
                continue
            crop = image[y:y+h, x:x+w]
            feat = self._extract(crop)
            if feat is None:
                continue
            prob = self.clf.predict_proba(feat.reshape(1, -1))[0]
            score = prob.max()
            if score >= self.score_thresh:
                boxes.append([x, y, x+w-1, y+h-1])
                scores.append(score)
                labels.append(int(prob.argmax()))

        if len(boxes) == 0:
            return [], [], []

        boxes = np.array(boxes)
        scores = np.array(scores)
        labels = np.array(labels)

        final_boxes, final_scores, final_labels = [], [], []
        for cls in np.unique(labels):
            idxs = np.where(labels == cls)[0]
            keep = non_max_suppression(boxes[idxs], scores[idxs], self.nms_iou)
            for k in keep:
                final_boxes.append(boxes[idxs[k]])
                final_scores.append(scores[idxs[k]])
                final_labels.append(cls)

        return final_boxes, final_scores, final_labels

#### Yolo Dataset 

In [29]:
class YOLODataset:
    """
    直接读取你现在的目录结构：
    AgroPest-12/
      ├── train/
      │   ├── images/
      │   └── labels/   (.txt)
      ├── valid/
      └── test/
    """
    def __init__(self, root_dir='AgroPest-12', split='train'):
        self.root_dir = root_dir
        self.split = split
        self.img_dir = os.path.join(root_dir, split, 'images')
        self.lbl_dir = os.path.join(root_dir, split, 'labels')
        
        # 读取类别名和数量
        yaml_path = os.path.join(root_dir, 'data.yaml')
        with open(yaml_path) as f:
            data = yaml.safe_load(f)
            self.names = data['names']
            self.num_classes = len(self.names)
        
        self.samples = self._load_samples()

    def _load_samples(self):
        """返回列表：(image_path, x1, y1, x2, y2, class_id)"""
        samples = []
        lbl_paths = sorted(glob.glob(os.path.join(self.lbl_dir, '*.txt')))
        
        for lbl_path in lbl_paths:
            img_name = os.path.splitext(os.path.basename(lbl_path))[0] + '.jpg'  # 支持 .png 改成下面
            # img_name = os.path.splitext(os.path.basename(lbl_path))[0] + '.png'
            img_path = os.path.join(self.img_dir, img_name)
            
            if not os.path.exists(img_path):
                continue
                
            img = cv2.imread(img_path)
            if img is None:
                continue
            h, w = img.shape[:2]
            
            with open(lbl_path) as f:
                for line in f:
                    parts = line.strip().split()
                    if len(parts) != 5:
                        continue
                    cls_id = int(parts[0])
                    x_center_norm, y_center_norm, bw_norm, bh_norm = map(float, parts[1:])
                    
                    x1 = int((x_center_norm - bw_norm / 2) * w)
                    y1 = int((y_center_norm - bh_norm / 2) * h)
                    x2 = int((x_center_norm + bw_norm / 2) * w)
                    y2 = int((y_center_norm + bh_norm / 2) * h)
                    
                    # 防止越界
                    x1 = max(0, x1)
                    y1 = max(0, y1)
                    x2 = min(w - 1, x2)
                    y2 = min(h - 1, y2)
                    
                    samples.append((img_path, x1, y1, x2, y2, cls_id))
        return samples

    def get_image_list(self):
        return sorted(list(set([s[0] for s in self.samples])))

#### Pipeline

In [30]:
class MLDetectionPipeline:
    def __init__(self, num_classes=12, feature='hog', classifier='svm', sift_k=256):
        self.num_classes = num_classes
        self.feature = feature
        self.classifier_name = classifier
        self.sift_k = sift_k

        if feature in ['hog', 'lbp']:
            self.fe = FeatureExtractor(mode=feature)
        elif feature == 'sift':
            self.fe = SIFTBoWExtractor(k=sift_k)
        else:
            raise ValueError('Unknown feature')

        self.clf = ClassifierWrapper(classifier)
        self.clf.build()

    def build_training_data_from_annotations(self, data_root='AgroPest-12', split='train'):
        """
        直接从 YOLO 格式读取训练样本
        """
        ds = YOLODataset(root_dir=data_root, split=split)

        # SIFT codebook 共享（只建一次）
        if self.feature == 'sift' and self.fe.codebook is None:
            codebook_path = os.path.join('models', 'sift_codebook.joblib')
            if os.path.exists(codebook_path):
                self.fe.codebook = load(codebook_path)
                print('Loaded shared SIFT codebook')
            else:
                print('Building shared SIFT codebook from training set...')
                descriptor_pool = []
                for img_path, x1, y1, x2, y2, _ in tqdm(ds.samples, desc='Collect SIFT desc'):
                    img = cv2.imread(img_path)
                    if img is None: continue
                    crop = img[y1:y2, x1:x2]
                    des = self.fe.compute_descriptors(crop)
                    if des is not None and len(des) > 0:
                        descriptor_pool.append(des)
                if descriptor_pool:
                    self.fe.fit_codebook(descriptor_pool)
                    dump(self.fe.codebook, codebook_path)
                    print('Shared codebook saved')
                else:
                    print('Warning: No SIFT descriptors collected')

        # 正式提取特征
        X, y = [], []
        for img_path, x1, y1, x2, y2, cls in tqdm(ds.samples, desc='Extracting features'):
            img = cv2.imread(img_path)
            if img is None: continue
            crop = img[y1:y2, x1:x2]
            if crop.size == 0: continue

            if self.feature == 'hog':
                feat = self.fe.extract_hog(crop)
            elif self.feature == 'lbp':
                feat = self.fe.extract_lbp(crop)
            elif self.feature == 'sift':
                feat = self.fe.extract_bow(crop)
            else:
                continue
            X.append(feat)
            y.append(cls)

        print(f'Extracted {len(X)} samples for {self.feature}+{self.classifier_name}')
        return np.array(X), np.array(y)

    # ==================== train() 支持验证集早停/选模型 ====================
    def train(self, X_train, y_train, X_val=None, y_val=None):
        """
        新增：支持验证集，早停或选最佳模型
        """
        self.clf.build(X_sample=X_train)

        if X_val is None or y_val is None:
            # 没有验证集 → 直接训练
            print("No validation set, training on full data...")
            self.clf.fit(X_train, y_train)
            best_score = 0
        else:
            # 有验证集 → 训练多个候选，取验证集最好的
            print("Training with validation set selection...")
            best_score = 0
            best_model = None

            # 这里我们用一个简单的训练策略：训练一个就保存最好的
            # 对于 RF/SVM 我们只训一次，但可以用 val 评估
            self.clf.fit(X_train, y_train)
            val_score = self.clf.model.score(X_val, y_val)  # 分类准确率
            best_score = val_score
            best_model = self.clf.model

            print(f"Validation accuracy: {val_score:.4f}")

        # 保存最佳模型
        self.clf.model = best_model if best_model is not None else self.clf.model
        print(f"Training completed. Best val acc: {best_score:.4f}")

    def save_models(self):
        ensure_dir('models_ml')  # 改成你自己的文件夹
        clf_path = os.path.join('models_ml', f'{self.feature}_{self.classifier_name}_clf.joblib')
        self.clf.save(clf_path)
        
        # SIFT codebook 也保存到同一个目录
        if self.feature == 'sift' and hasattr(self.fe, 'codebook') and self.fe.codebook is not None:
            codebook_path = os.path.join('models_ml', 'sift_codebook.joblib')
            dump(self.fe.codebook, codebook_path)
            print('SIFT codebook saved')

    def load_models(self):
        clf_path = os.path.join('models_ml', f'{self.feature}_{self.classifier_name}_clf.joblib')
        if os.path.exists(clf_path):
            self.clf.load(clf_path)
            print(f'Loaded classifier: {clf_path}')
        else:
            print(f'Warning: Model not found: {clf_path}')
            
        if self.feature == 'sift':
            codebook_path = os.path.join('models_ml', 'sift_codebook.joblib')
            if os.path.exists(codebook_path):
                self.fe.codebook = load(codebook_path)
                print('Loaded SIFT codebook')

#### FullPipelineController

In [31]:
import os
import json
import yaml
import numpy as np
import pandas as pd
from tqdm import tqdm
import math

class FullPipelineController:
    def __init__(self, data_root='data/AgroPest12', model_dir='models_ml', results_dir='results_ml'):
        self.data_root = data_root
        self.model_dir = model_dir
        self.results_dir = results_dir
        ensure_dir(self.results_dir)

        yaml_path = os.path.join(data_root, 'data.yaml')
        with open(yaml_path, 'r', encoding='utf-8') as f:
            cfg = yaml.safe_load(f)
            self.num_classes = cfg['nc']
            self.class_names = cfg.get('names', [f'c{i}' for i in range(self.num_classes)])

        self.test_ds = YOLODataset(root_dir=data_root, split='test')
        self.gt = self._build_gt_dict()

    def _build_gt_dict(self):
        gt = {}
        for img_path, x1, y1, x2, y2, cls in self.test_ds.samples:
            gt.setdefault(img_path, []).append(([x1, y1, x2, y2], cls))
        return gt

    def get_pipeline(self, feature, classifier):
        p = MLDetectionPipeline(num_classes=self.num_classes, feature=feature, classifier=classifier)
        p.load_models()
        return p

    def get_detector(self, detector_name, pipeline):
        if detector_name == 'sliding':
            return SlidingWindowDetector(pipeline.fe, pipeline.clf.wrapper.model)
        elif detector_name == 'selective':
            return SelectiveSearchDetector(pipeline.fe, pipeline.clf.model)
        elif detector_name == 'mser':
            return MSERDetector(pipeline.fe, pipeline.clf.model)
        else:
            raise ValueError(f'Unknown detector: {detector_name}')

    def run_detection(self, detector_name, feature, classifier):
        pipeline = self.get_pipeline(feature, classifier)
        detector = self.get_detector(detector_name, pipeline)

        detections = {}
        timings = {}
        img_list = sorted(self.gt.keys())

        for img_path in tqdm(img_list, desc=f'{feature}_{classifier}_{detector_name}', leave=False):
            t0 = time.time()
            img = cv2.imread(img_path)
            if img is None: continue
            boxes, scores, labels = detector.detect(img)
            t1 = time.time()
            detections[img_path] = [(list(map(int, b)), float(s), int(l)) for b, s, l in zip(boxes, scores, labels)]
            timings[img_path] = t1 - t0

        return detections, timings

    def save_results(self, key, detections, timings, mAP):
        # 只保存 mAP 和 timings（极简！）
        result = {
            'mAP@0.5': round(float(mAP), 4),
            'avg_inference_time(s)': round(np.mean(list(timings.values())), 4),
            'total_images': len(timings),
            'total_time(s)': round(sum(timings.values()), 2),
            'FPS': round(1 / np.mean(list(timings.values())), 2)
        }
        # 保存精简版
        with open(os.path.join(self.results_dir, f'{key}_result.json'), 'w') as f:
            json.dump(result, f, indent=2)
        # 同时保存 detections（可选，用于后续可视化）
        with open(os.path.join(self.results_dir, f'{key}_detections.json'), 'w') as f:
            json.dump({'detections': detections, 'timings': timings}, f)

    # ==================== 主函数：支持两种模式 ====================
    def run(self, run_best_only=True,
            best_feature='sift', best_classifier='rf', best_detector='selective'):
        """
        run_best_only=True  → 只跑最优模型
        run_best_only=False → 跑全部 27 个（消融实验用）
        """
        if run_best_only:
            print("模式：只评估最优模型")
            key = f'{best_feature}_{best_classifier}_{best_detector}'
            result_file = os.path.join(self.results_dir, f'{key}_result.json')
            if os.path.exists(result_file):
                with open(result_file) as f:
                    data = json.load(f)
                print(f"已存在 → {key} | mAP@0.5 = {data['mAP@0.5']} | FPS = {data['FPS']}")
                return

            print(f"正在评估最优模型：{key}")
            detections, timings = self.run_detection(best_detector, best_feature, best_classifier)
            result_dict = evaluate_map(detections, self.gt, self.num_classes)
            mAP = result_dict['mAP@0.5']
            self.save_results(key, detections, timings, mAP)
            print(f"完成！最优模型 mAP@0.5 = {mAP:.4f} | FPS = {1/np.mean(list(timings.values())):.2f}")

        else:
            print("模式：评估全部 27 种组合（消融实验）")
            features = ['hog', 'lbp', 'sift']
            classifiers = ['svm', 'rf', 'knn']
            detectors = ['mser', 'sliding', 'selective']
            results = []

            for det in detectors:
                for feat in features:
                    for clf in classifiers:
                        key = f'{feat}_{clf}_{det}'
                        result_file = os.path.join(self.results_dir, f'{key}_result.json')
                        if os.path.exists(result_file):
                            with open(result_file) as f:
                                data = json.load(f)
                            results.append({'model': key, 'mAP@0.5': data['mAP@0.5'], 'FPS': data['FPS']})
                            print(f"已完成: {key} → mAP@0.5 = {data['mAP@0.5']}")
                            continue

                        print(f"Running → {key}")
                        try:
                            detections, timings = self.run_detection(det, feat, clf)
                            result_dict = evaluate_map(detections, self.gt, self.num_classes)
                            mAP = result_dict['mAP@0.5']
                            self.save_results(key, detections, timings, mAP)
                            fps = 1 / np.mean(list(timings.values()))
                            results.append({'model': key, 'mAP@0.5': round(mAP, 4), 'FPS': round(fps, 2)})
                        except Exception as e:
                            print(f"Error {key}: {e}")

            # 保存汇总表格
            df = pd.DataFrame(results).sort_values('mAP@0.5', ascending=False)
            df.to_excel(os.path.join(self.results_dir, 'ALL_RESULTS.xlsx'), index=False)
            df.to_csv(os.path.join(self.results_dir, 'ALL_RESULTS.csv'), index=False)
            print("全部完成！查看 ALL_RESULTS.xlsx")
            
    def generate_final_visualizations(self, 
                                      feature='sift', 
                                      classifier='rf', 
                                      detector_name='selective',
                                      num_success=6, 
                                      num_failure=6):
        """
        1. 混淆矩阵（保存为 png）
        2. 6 张成功案例（TP）
        3. 6 张失败案例（FN + FP）
        """
        import matplotlib.pyplot as plt
        import random
        import cv2
        
        key = f'{feature}_{classifier}_{detector_name}'
        det_path = os.path.join(self.results_dir, f'{key}_detections.json')
        if not os.path.exists(det_path):
            print(f"未找到检测结果：{det_path}，请先跑 run()")
            return

        with open(det_path, 'r') as f:
            data = json.load(f)
            detections = data['detections']

        # 1. 生成混淆矩阵
        print("正在生成混淆矩阵...")
        all_gt_labels = []
        all_pred_labels = []
        matched_gt = set()

        for img_path in detections:
            if img_path not in self.gt:
                continue
            pred_boxes = detections[img_path]
            gt_boxes = self.gt[img_path]

            # 简单匹配：每个 pred 找最近的 gt（IoU > 0.5）
            for pred_box, score, pred_label in pred_boxes:
                best_iou = 0
                best_gt_label = -1
                for gt_box, gt_label in gt_boxes:
                    iou = self._compute_iou(pred_box, gt_box)
                    if iou > best_iou and iou > 0.5:
                        best_iou = iou
                        best_gt_label = gt_label
                if best_gt_label != -1:
                    all_gt_labels.append(best_gt_label)
                    all_pred_labels.append(pred_label)
                    matched_gt.add((img_path, tuple(gt_box)))

        if len(all_gt_labels) > 0:
            from sklearn.metrics import confusion_matrix
            cm = confusion_matrix(all_gt_labels, all_pred_labels, labels=list(range(self.num_classes)))
            plt.figure(figsize=(10, 8))
            import seaborn as sns
            sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                        xticklabels=self.class_names, yticklabels=self.class_names)
            plt.title(f'Confusion Matrix - {key}')
            plt.ylabel('Ground Truth')
            plt.xlabel('Prediction')
            plt.tight_layout()
            plt.savefig(os.path.join(self.results_dir, f'{key}_confusion_matrix.png'), dpi=300)
            plt.close()
            print(f"混淆矩阵已保存：{key}_confusion_matrix.png")
        else:
            print("无匹配框，无法生成混淆矩阵")

        # 2. 可视化成功和失败案例
        print("正在挑选成功和失败案例...")
        success_cases = []
        failure_cases = []

        img_paths = list(self.gt.keys())
        random.shuffle(img_paths)

        for img_path in img_paths:
            if len(success_cases) >= num_success and len(failure_cases) >= num_failure:
                break

            img = cv2.imread(img_path)
            if img is None: continue
            h, w = img.shape[:2]

            gt_boxes = [b for b, c in self.gt[img_path]]
            gt_labels = [c for b, c in self.gt[img_path]]

            pred_items = detections.get(img_path, [])
            pred_boxes = [b for b, s, l in pred_items]
            pred_labels = [l for b, s, l in pred_items]

            # 计算 TP、FN、FP
            tp = 0
            matched = set()
            for i, pred_box in enumerate(pred_boxes):
                for j, gt_box in enumerate(gt_boxes):
                    if self._compute_iou(pred_box, gt_box) > 0.5 and pred_labels[i] == gt_labels[j]:
                        tp += 1
                        matched.add(j)
                        break

            fn = len(gt_boxes) - len(matched)
            fp = len(pred_boxes) - tp

            # 画图函数
            def draw_and_save(boxes, labels, color, title_prefix):
                img_copy = img.copy()
                for box, label in zip(boxes, labels):
                    x1, y1, x2, y2 = map(int, box)
                    cv2.rectangle(img_copy, (x1, y1), (x2, y2), color, 2)
                    cv2.putText(img_copy, self.class_names[label], (x1, y1-10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                return img_copy

            if tp >= 2 and len(success_cases) < num_success:
                vis = draw_and_save(gt_boxes, gt_labels, (0, 255, 0), "GT")
                vis = draw_and_save(pred_boxes, pred_labels, (255, 0, 0), "Pred")
                cv2.putText(vis, "SUCCESS", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 3)
                success_cases.append((vis, img_path))

            if (fn >= 1 or fp >= 2) and len(failure_cases) < num_failure:
                vis = draw_and_save(gt_boxes, gt_labels, (0, 255, 0), "GT")
                vis = draw_and_save(pred_boxes, pred_labels, (255, 0, 0), "Pred")
                cv2.putText(vis, "FAILURE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
                failure_cases.append((vis, img_path))

        # 保存图片
        vis_dir = os.path.join(self.results_dir, 'visualizations')
        ensure_dir(vis_dir)
        for i, (img_vis, path) in enumerate(success_cases):
            cv2.imwrite(os.path.join(vis_dir, f'success_{i+1}_{os.path.basename(path)}'), img_vis)
        for i, (img_vis, path) in enumerate(failure_cases):
            cv2.imwrite(os.path.join(vis_dir, f'failure_{i+1}_{os.path.basename(path)}'), img_vis)

        print(f"可视化完成！共 {len(success_cases)} 张成功案例 + {len(failure_cases)} 张失败案例")
        print(f"所有图片保存在：{vis_dir}")

    def _compute_iou(self, box1, box2):
        x1, y1, x2, y2 = box1
        x1g, y1g, x2g, y2g = box2
        xi1 = max(x1, x1g)
        yi1 = max(y1, y1g)
        xi2 = min(x2, x2g)
        yi2 = min(y2, y2g)
        if xi2 <= xi1 or yi2 <= yi1:
            return 0.0
        inter_area = (xi2 - xi1) * (yi2 - yi1)
        box1_area = (x2 - x1) * (y2 - y1)
        box2_area = (x2g - x1g) * (y2g - y1g)
        union_area = box1_area + box2_area - inter_area
        return inter_area / union_area if union_area > 0 else 0.0
    
    def quick_test_accuracy(self, feature='sift', classifier='rf', detector_name='selective'):
        """快速测试现有模型的准确率"""
        key = f'{feature}_{classifier}_{detector_name}'
        det_path = os.path.join(self.results_dir, f'{key}_detections.json')
        
        if not os.path.exists(det_path):
            print(f"未找到检测结果，请先运行检测：{det_path}")
            return
        
        # 加载检测结果
        with open(det_path, 'r') as f:
            data = json.load(f)
            detections = data['detections']
        
        # 计算准确率
        accuracy_results = self.calculate_accuracy(detections)
        
        print(f"=== {key} 准确率分析 ===")
        print(f"总体准确率: {accuracy_results['overall_accuracy']:.4f}")
        print(f"匹配检测框总数: {accuracy_results['total_matched_detections']}")
        print(f"正确分类数: {accuracy_results['correct_predictions']}")
        print("\n各类别准确率:")
        for class_name, acc in accuracy_results['class_accuracy'].items():
            print(f"  {class_name}: {acc:.4f}")

    def calculate_accuracy(self, detections, iou_threshold=0.5):
        """
        计算分类准确率
        对于每个匹配的检测框（IoU > threshold），检查分类是否正确
        """
        correct_predictions = 0
        total_matched_detections = 0
        all_predictions = []
        all_true_labels = []
        
        for img_path in detections:
            if img_path not in self.gt:
                continue
                
            pred_items = detections[img_path]  # [(box, score, label), ...]
            gt_items = self.gt[img_path]       # [(box, class), ...]
            
            # 创建匹配标记
            gt_matched = [False] * len(gt_items)
            
            # 对每个预测框，找到最佳匹配的真实框
            for pred_box, score, pred_label in pred_items:
                best_iou = 0
                best_gt_idx = -1
                
                for gt_idx, (gt_box, gt_label) in enumerate(gt_items):
                    if gt_matched[gt_idx]:
                        continue
                        
                    iou = self._compute_iou(pred_box, gt_box)
                    if iou > best_iou:
                        best_iou = iou
                        best_gt_idx = gt_idx
                
                # 如果找到匹配且IoU大于阈值
                if best_gt_idx != -1 and best_iou >= iou_threshold:
                    gt_matched[best_gt_idx] = True
                    gt_label = gt_items[best_gt_idx][1]
                    
                    # 记录预测和真实标签
                    all_predictions.append(pred_label)
                    all_true_labels.append(gt_label)
                    
                    # 检查分类是否正确
                    if pred_label == gt_label:
                        correct_predictions += 1
                    total_matched_detections += 1
        
        # 计算准确率
        accuracy = correct_predictions / total_matched_detections if total_matched_detections > 0 else 0
        
        # 计算每个类别的准确率
        class_correct = [0] * self.num_classes
        class_total = [0] * self.num_classes
        
        for pred, true in zip(all_predictions, all_true_labels):
            class_total[true] += 1
            if pred == true:
                class_correct[true] += 1
        
        class_accuracy = {}
        for i in range(self.num_classes):
            if class_total[i] > 0:
                class_accuracy[self.class_names[i]] = class_correct[i] / class_total[i]
            else:
                class_accuracy[self.class_names[i]] = 0
        
        return {
            'overall_accuracy': round(accuracy, 4),
            'total_matched_detections': total_matched_detections,
            'correct_predictions': correct_predictions,
            'class_accuracy': class_accuracy,
            'all_predictions': all_predictions,
            'all_true_labels': all_true_labels
        }

    def save_results_with_accuracy(self, key, detections, timings, mAP, accuracy_results):
        """保存结果，包含准确率信息"""
        result = {
            'mAP@0.5': round(float(mAP), 4),
            'accuracy@0.5': accuracy_results['overall_accuracy'],
            'avg_inference_time(s)': round(np.mean(list(timings.values())), 4),
            'total_images': len(timings),
            'total_time(s)': round(sum(timings.values()), 2),
            'FPS': round(1 / np.mean(list(timings.values())), 2),
            'total_matched_detections': accuracy_results['total_matched_detections'],
            'correct_predictions': accuracy_results['correct_predictions'],
            'class_accuracy': accuracy_results['class_accuracy']
        }
        
        # 保存精简版结果
        with open(os.path.join(self.results_dir, f'{key}_result.json'), 'w') as f:
            json.dump(result, f, indent=2)
        
        # 保存详细检测结果（用于后续分析）
        with open(os.path.join(self.results_dir, f'{key}_detections.json'), 'w') as f:
            json.dump({
                'detections': detections, 
                'timings': timings,
                'accuracy_details': {
                    'all_predictions': accuracy_results['all_predictions'],
                    'all_true_labels': accuracy_results['all_true_labels']
                }
            }, f)


#### mAP Evaluation

In [32]:
def compute_ap(recall, precision):
    # 11-point interpolation or more modern continuous AP
    # We'll use VOC 2010+ style: integrate precision-recall curve
    # make precision monotonically decreasing
    mrec = np.concatenate(([0.], recall, [1.]))
    mpre = np.concatenate(([0.], precision, [0.]))
    for i in range(len(mpre)-1, 0, -1):
        mpre[i-1] = max(mpre[i-1], mpre[i])
    # area under curve
    idx = np.where(mrec[1:] != mrec[:-1])[0]
    ap = np.sum((mrec[idx+1] - mrec[idx]) * mpre[idx+1])
    return ap


from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from collections import defaultdict

def evaluate_map(detections, ground_truths, num_classes, iou_thresh=0.5):
    APs = []
    for cls in range(num_classes):
        det_list = []
        gt_count = 0
        gt_by_image = {}
        for img, gts in ground_truths.items():
            gboxes = [b for b, c in gts if c == cls]
            gt_count += len(gboxes)
            gt_by_image[img] = {'boxes': gboxes, 'det': np.zeros(len(gboxes), dtype=bool)}
        
        for img, dets in detections.items():
            for box, score, c in dets:
                if c == cls:
                    det_list.append((img, score, box))

        if gt_count == 0:
            APs.append(float('nan'))
            continue

        det_list = sorted(det_list, key=lambda x: x[1], reverse=True)
        tp = np.zeros(len(det_list))
        fp = np.zeros(len(det_list))

        for i, (img, score, box) in enumerate(det_list):
            ginfo = gt_by_image.get(img, {'boxes': [], 'det': np.array([])})
            if len(ginfo['boxes']) == 0:
                fp[i] = 1
                continue
            best_iou = 0.0
            best_idx = -1
            for j, gbox in enumerate(ginfo['boxes']):
                _iou = iou(box, gbox)
                if _iou > best_iou:
                    best_iou = _iou
                    best_idx = j
            if best_iou >= iou_thresh and best_idx >= 0:
                if not ginfo['det'][best_idx]:
                    tp[i] = 1
                    ginfo['det'][best_idx] = True
                else:
                    fp[i] = 1
            else:
                fp[i] = 1

        fp_cum = np.cumsum(fp)
        tp_cum = np.cumsum(tp)
        recall = tp_cum / float(gt_count)
        precision = tp_cum / np.maximum(tp_cum + fp_cum, np.finfo(np.float64).eps)
        ap = compute_ap(recall, precision)
        APs.append(ap)

    valid_aps = [a for a in APs if not math.isnan(a)]
    mAP = np.mean(valid_aps) if valid_aps else 0.0

    return {
        'APs': APs,
        'mAP': mAP,
        'mAP@0.5': mAP,
        'mAP@0.5:0.95': mAP  # 占位
    }

def compute_detection_metrics(detections, ground_truths, num_classes, iou_thresh=0.5):
    """
    输入：detections 和 gt 字典
    输出：各类指标（macro平均） + 每张图的 TP/FP/FN 统计
    """
    y_true = []
    y_pred = []
    y_scores = []          # 用于 AUC
    all_tp = defaultdict(int)
    all_fp = defaultdict(int)
    all_fn = defaultdict(int)

    # 为每张图构建 gt 标记状态
    gt_used = {}
    for img, gts in ground_truths.items():
        gt_used[img] = {i: False for i in range(len(gts))}

    # 收集所有检测（按置信度排序）
    all_dets = []
    for img, dets in detections.items():
        for box, score, cls in dets:
            all_dets.append((img, score, box, cls))
    all_dets.sort(key=lambda x: x[1], reverse=True)

    for img, score, box, pred_cls in all_dets:
        if img not in ground_truths:
            y_true.append(-1)           # 背景
            y_pred.append(pred_cls)
            y_scores.append(score)
            all_fp[pred_cls] += 1
            continue

        best_iou = 0
        best_idx = -1
        best_gt_cls = -1
        for idx, (gt_box, gt_cls) in enumerate(ground_truths[img]):
            if gt_used[img][idx]:
                continue
            _iou = iou(box, gt_box)
            if _iou > best_iou:
                best_iou = _iou
                best_idx = idx
                best_gt_cls = gt_cls

        if best_iou >= iou_thresh and pred_cls == best_gt_cls:
            # TP
            y_true.append(pred_cls)
            y_pred.append(pred_cls)
            y_scores.append(score)
            all_tp[pred_cls] += 1
            gt_used[img][best_idx] = True
        else:
            # FP
            y_true.append(-1)
            y_pred.append(pred_cls)
            y_scores.append(score)
            all_fp[pred_cls] += 1

    # 统计漏检 FN
    for img, used_dict in gt_used.items():
        for idx, used in used_dict.items():
            if not used:
                gt_cls = ground_truths[img][idx][1]
                all_fn[gt_cls] += 1
                y_true.append(gt_cls)
                y_pred.append(-1)   # 没预测出来

    # 计算指标（忽略背景类 -1）
    valid = np.array(y_true) >= 0
    if valid.sum() == 0:
        return {k: 0.0 for k in ['precision','recall','f1','accuracy','auc']}, all_tp, all_fp, all_fn

    yt = np.array(y_true)[valid]
    yp = np.array(y_pred)[valid]
    ys = np.array(y_scores)[valid]

    metrics = {
        'precision': precision_score(yt, yp, average='macro', zero_division=0),
        'recall'   : recall_score(yt, yp, average='macro', zero_division=0),
        'f1'       : f1_score(yt, yp, average='macro', zero_division=0),
        'accuracy' : accuracy_score(yt, yp),
        'auc'      : roc_auc_score(yt, ys, average='macro', multi_class='ovr')
    }
    return metrics, all_tp, all_fp, all_fn


def save_confusion_matrix(detections, ground_truths, num_classes, class_names, save_path, prefix):
    y_true = []
    y_pred = []

    # 标记已匹配的 gt
    matched_gt = {}
    for img in ground_truths:
        matched_gt[img] = [False] * len(ground_truths[img])

    # 遍历所有预测（按分数从高到低已排序）
    all_dets = []
    for img, dets in detections.items():
        for box, score, cls in dets:
            all_dets.append((score, img, box, cls))
    all_dets.sort(reverse=True)  # 降序

    for score, img, box, pred_cls in all_dets:
        if img not in ground_truths:
            continue
        best_iou = 0
        best_j = -1
        best_gt_cls = -1
        for j, (gt_box, gt_cls) in enumerate(ground_truths[img]):
            if matched_gt[img][j]:
                continue
            _iou = iou(box, gt_box)
            if _iou > best_iou:
                best_iou = _iou
                best_j = j
                best_gt_cls = gt_cls

        if best_iou >= 0.5 and pred_cls == best_gt_cls:
            y_true.append(best_gt_cls)
            y_pred.append(pred_cls)
            matched_fp = False
            matched_gt[img][best_j] = True
        # FP 不加入混淆矩阵（只看匹配上的）

    # 添加漏检 FN
    for img in ground_truths:
        for j, (gt_box, gt_cls) in enumerate(ground_truths[img]):
            if not matched_gt[img][j]:
                y_true.append(gt_cls)
                y_pred.append(-1)  # 漏检

    # 移除漏检，只画正确匹配的部分
    mask = np.array(y_pred) != -1
    y_true = np.array(y_true)[mask]
    y_pred = np.array(y_pred)[mask]

    if len(y_true) == 0:
        print(f"{prefix} 无匹配样本，跳过混淆矩阵")
        return

    cm = confusion_matrix(y_true, y_pred, labels=list(range(num_classes)))
    cm_df = pd.DataFrame(cm, index=class_names, columns=class_names)

    # 保存 CSV
    cm_df.to_csv(f"{save_path}_{prefix}_cm.csv")

    # 画图
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=True,
                xticklabels=class_names, yticklabels=class_names, linewidths=.5)
    plt.title(f'Confusion Matrix - {prefix}\n(IoU≥0.5 matched only, {len(y_true)} samples)')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.savefig(f"{save_path}_{prefix}_cm.png", dpi=300)
    plt.close()
    print(f"{prefix} 混淆矩阵已保存 ({len(y_true)} 匹配样本)")

####  Main

In [33]:
DATA_ROOT = 'data/AgroPest12' 
ensure_dir('models_ml')
ensure_dir('results_ml')


In [34]:
print("开始训练 7 个模型（使用 valid 集选最佳模型）...")

# 所有特征+分类器组合
for feat in ['hog', 'lbp', 'sift']:
    for clf in ['svm', 'rf', 'knn']:
        # SIFT 太慢，只训练最强的 rf（可自行调整）
        if feat == 'sift' and clf != 'rf':
            continue
            
        print(f'\nTraining {feat}+{clf}')
        pipe = MLDetectionPipeline(num_classes=12, feature=feat, classifier=clf, sift_k=256)
        
        # 1. 提取训练集特征
        X_train, y_train = pipe.build_training_data_from_annotations(
            data_root=DATA_ROOT, split='train')
        if len(X_train) == 0:
            print('No train samples!')
            continue

        # 2. 提取验证集特征（用于选模型）
        print(f"Extracting validation features for {feat}+{clf}...")
        X_val, y_val = pipe.build_training_data_from_annotations(
            data_root=DATA_ROOT, split='valid')  # 关键！用 valid 集

        if len(X_val) == 0:
            print("No validation samples, training without validation...")
            pipe.train(X_train, y_train)
        else:
            # 3. 训练并用验证集选最佳模型
            pipe.train(X_train, y_train, X_val=X_val, y_val=y_val)
        
        pipe.save_models()
        print(f"{feat}+{clf} 训练完成并保存！")

开始训练 7 个模型（使用 valid 集选最佳模型）...

Training hog+svm


Extracting features: 100%|██████████| 15282/15282 [01:47<00:00, 141.81it/s]


Extracted 15282 samples for hog+svm
Extracting validation features for hog+svm...


Extracting features: 100%|██████████| 1341/1341 [00:09<00:00, 143.15it/s]


Extracted 1341 samples for hog+svm
Applying PCA for high-dim feature (1764 → ~150 dims)
Training with validation set selection...
Training SVM on 15282 samples, 1764 dims...Done
Validation accuracy: 0.3087
Training completed. Best val acc: 0.3087
hog+svm 训练完成并保存！

Training hog+rf


Extracting features: 100%|██████████| 15282/15282 [01:35<00:00, 159.61it/s]


Extracted 15282 samples for hog+rf
Extracting validation features for hog+rf...


Extracting features: 100%|██████████| 1341/1341 [00:08<00:00, 156.19it/s]


Extracted 1341 samples for hog+rf
Applying PCA for high-dim feature (1764 → ~150 dims)
Training with validation set selection...
Training RF on 15282 samples, 1764 dims...Done
Validation accuracy: 0.2796
Training completed. Best val acc: 0.2796
hog+rf 训练完成并保存！

Training hog+knn


Extracting features: 100%|██████████| 15282/15282 [01:39<00:00, 154.12it/s]


Extracted 15282 samples for hog+knn
Extracting validation features for hog+knn...


Extracting features: 100%|██████████| 1341/1341 [00:08<00:00, 153.58it/s]


Extracted 1341 samples for hog+knn
Training with validation set selection...
Training KNN on 15282 samples, 1764 dims...Done


Found Intel OpenMP ('libiomp') and LLVM OpenMP ('libomp') loaded at
the same time. Both libraries are known to be incompatible and this
can cause random crashes or deadlocks on Linux when loaded in the
same Python program.
Using threadpoolctl may cause crashes or deadlocks. For more
information and possible workarounds, please see
    https://github.com/joblib/threadpoolctl/blob/master/multiple_openmp.md



Validation accuracy: 0.1984
Training completed. Best val acc: 0.1984
hog+knn 训练完成并保存！

Training lbp+svm


Extracting features: 100%|██████████| 15282/15282 [01:36<00:00, 158.84it/s]


Extracted 15282 samples for lbp+svm
Extracting validation features for lbp+svm...


Extracting features: 100%|██████████| 1341/1341 [00:06<00:00, 194.14it/s]


Extracted 1341 samples for lbp+svm
Training with validation set selection...
Training SVM on 15282 samples, 10 dims...Done
Validation accuracy: 0.2319
Training completed. Best val acc: 0.2319
lbp+svm 训练完成并保存！

Training lbp+rf


Extracting features: 100%|██████████| 15282/15282 [01:45<00:00, 144.24it/s]


Extracted 15282 samples for lbp+rf
Extracting validation features for lbp+rf...


Extracting features: 100%|██████████| 1341/1341 [00:09<00:00, 141.70it/s]


Extracted 1341 samples for lbp+rf
Training with validation set selection...
Training RF on 15282 samples, 10 dims...Done
Validation accuracy: 0.2610
Training completed. Best val acc: 0.2610
lbp+rf 训练完成并保存！

Training lbp+knn


Extracting features: 100%|██████████| 15282/15282 [01:53<00:00, 134.54it/s]


Extracted 15282 samples for lbp+knn
Extracting validation features for lbp+knn...


Extracting features: 100%|██████████| 1341/1341 [00:09<00:00, 142.14it/s]


Extracted 1341 samples for lbp+knn
Training with validation set selection...
Training KNN on 15282 samples, 10 dims...Done
Validation accuracy: 0.2185
Training completed. Best val acc: 0.2185
lbp+knn 训练完成并保存！

Training sift+rf
Loaded shared SIFT codebook


Extracting features: 100%|██████████| 15282/15282 [02:04<00:00, 122.57it/s]


Extracted 15282 samples for sift+rf
Extracting validation features for sift+rf...


Extracting features: 100%|██████████| 1341/1341 [00:11<00:00, 118.56it/s]


Extracted 1341 samples for sift+rf
Training with validation set selection...
Training RF on 15282 samples, 256 dims...Done
Validation accuracy: 0.3274
Training completed. Best val acc: 0.3274
SIFT codebook saved
sift+rf 训练完成并保存！


In [35]:
def report_classification_performance_on_valid():
    from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score
    
    results = []
    for feat in ['hog', 'lbp', 'sift']:
        for clf in ['svm', 'rf', 'knn']:
            model_path = f'models_ml/{feat}_{clf}_clf.joblib'
            if not os.path.exists(model_path):
                continue
                
            pipe = MLDetectionPipeline(num_classes=12, feature=feat, classifier=clf)
            pipe.load_models()
            
            X_val, y_val = pipe.build_training_data_from_annotations(
                data_root='data/AgroPest12', split='valid')
            if len(X_val) == 0: continue
            
            y_pred = pipe.clf.model.predict(X_val)
            acc = accuracy_score(y_val, y_pred)
            p, r, f1, _ = precision_recall_fscore_support(y_val, y_pred, average='macro', zero_division=0)
            
            auc = 'N/A'
            if hasattr(pipe.clf.model, 'predict_proba'):
                prob = pipe.clf.model.predict_proba(X_val)
                try:
                    auc = roc_auc_score(y_val, prob, multi_class='ovr', average='macro')
                    auc = round(auc, 4)
                except:
                    auc = 'N/A'
            
            results.append({
                'Method': f'{feat}_{clf}',
                'Val_Acc': round(acc, 4),
                'Val_Precision': round(p, 4),
                'Val_Recall': round(r, 4),
                'Val_F1': round(f1, 4),
                'Val_AUC': auc
            })
    
    df = pd.DataFrame(results)
    df.to_csv('results_ml/classification_performance_on_VALID.csv', index=False)
    print(df)
    print("\n分类性能表格已保存！")

# 训练完所有模型后，执行：
report_classification_performance_on_valid()

Loaded classifier: models_ml\hog_svm_clf.joblib


Extracting features: 100%|██████████| 1341/1341 [00:10<00:00, 127.93it/s]


Extracted 1341 samples for hog+svm
Loaded classifier: models_ml\hog_rf_clf.joblib


Extracting features: 100%|██████████| 1341/1341 [00:09<00:00, 137.83it/s]


Extracted 1341 samples for hog+rf
Loaded classifier: models_ml\hog_knn_clf.joblib


Extracting features: 100%|██████████| 1341/1341 [00:10<00:00, 123.46it/s]


Extracted 1341 samples for hog+knn
Loaded classifier: models_ml\lbp_svm_clf.joblib


Extracting features: 100%|██████████| 1341/1341 [00:09<00:00, 134.20it/s]


Extracted 1341 samples for lbp+svm
Loaded classifier: models_ml\lbp_rf_clf.joblib


Extracting features: 100%|██████████| 1341/1341 [00:10<00:00, 123.35it/s]


Extracted 1341 samples for lbp+rf
Loaded classifier: models_ml\lbp_knn_clf.joblib


Extracting features: 100%|██████████| 1341/1341 [00:10<00:00, 127.64it/s]


Extracted 1341 samples for lbp+knn
Loaded classifier: models_ml\sift_rf_clf.joblib
Loaded SIFT codebook


Extracting features: 100%|██████████| 1341/1341 [00:12<00:00, 109.08it/s]


Extracted 1341 samples for sift+rf
    Method  Val_Acc  Val_Precision  Val_Recall  Val_F1  Val_AUC
0  hog_svm   0.3087         0.2747      0.2787  0.2553   0.7564
1   hog_rf   0.2796         0.2979      0.2604  0.2458   0.7371
2  hog_knn   0.1984         0.3526      0.2163  0.1858   0.6473
3  lbp_svm   0.2319         0.2031      0.2022  0.1619   0.6984
4   lbp_rf   0.2610         0.2353      0.2507  0.2378   0.7421
5  lbp_knn   0.2185         0.2070      0.2097  0.2061   0.6471
6  sift_rf   0.3274         0.3333      0.3069  0.2833   0.7623

分类性能表格已保存！


In [36]:
controller = FullPipelineController(data_root='data/AgroPest12')


In [37]:

controller.run(run_best_only=True,
                best_feature='sift',
                best_classifier='rf',
                best_detector='selective') 



模式：只评估最优模型
已存在 → sift_rf_selective | mAP@0.5 = 0.038 | FPS = 0.17


In [38]:
# 使用快速测试
controller.quick_test_accuracy('sift', 'rf', 'selective')

=== sift_rf_selective 准确率分析 ===
总体准确率: 0.3746
匹配检测框总数: 283
正确分类数: 106

各类别准确率:
  Ants: 0.5600
  Bees: 0.3077
  Beetles: 0.0000
  Caterpillars: 0.2083
  Earthworms: 0.3636
  Earwigs: 0.0000
  Grasshoppers: 0.5000
  Moths: 0.7297
  Slugs: 0.0833
  Snails: 0.4783
  Wasps: 0.3548
  Weevils: 0.4444
