In [1]:
import cv2
import numpy as np
import os, random, time, joblib, json, shutil
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.svm import SVC
from sklearn.metrics import classification_report, f1_score
from collections import defaultdict
import warnings

warnings.filterwarnings('ignore')
print("Libraries imported.")

Libraries imported.


In [2]:
# Configuration

BASE_CONFIG = {
    'FEATURE_ID': 'K800_H576_S16_WGT', # 피처 캐시 ID
    'K_VOCAB_SIZE': 800,
    'SIFT_STEP': 16,
    'HOG_CELL': 64,
    'HOG_BLOCK': 128,
    'HOG_STRIDE': 64,
    'COLOR_BINS': 16,
    'W_BOVW': 1.0,
    'W_HOG': 0.5,
    'W_COLOR': 1.0,
    'CLASSIFIER': 'SVC',
    'C': 2.0,
    'KERNEL': 'rbf',
    'GAMMA': 'scale'
}

#Data path and Global variable
DATA_DIR = '../data'
TRAIN_IMG_DIR = os.path.join(DATA_DIR, 'train', 'images')
TRAIN_LBL_DIR = os.path.join(DATA_DIR, 'train', 'labels')
VALID_IMG_DIR = os.path.join(DATA_DIR, 'valid', 'images')
VALID_LBL_DIR = os.path.join(DATA_DIR, 'valid', 'labels')
BASE_CACHE_DIR = './cache_experiments'

CLASS_NAMES = [
    "Ants","Bees","Beetles","Caterpillars","Earthworms","Earwigs",
    "Grasshoppers","Moths","Slugs","Snails","Wasps","Weevils"
]
NUM_CLASSES = len(CLASS_NAMES)
STD_WINDOW_SIZE = (320, 320)
BATCH_SIZE = 128
sift = cv2.SIFT_create()

print(f"Base Config loaded: {BASE_CONFIG['FEATURE_ID']}")

Base Config loaded: K800_H576_S16_WGT


In [3]:
#Feature Extractors

def get_hog_features(img, std_size, cell_size, block_size, stride, nbins=9):
    win_size = std_size
    block_size_param = (block_size, block_size)
    block_stride = (stride, stride)
    cell_size_param = (cell_size, cell_size)
    if img.shape[:2] != std_size[::-1]:
        img = cv2.resize(img, std_size)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    hog = cv2.HOGDescriptor(win_size, block_size_param, block_stride, cell_size_param, nbins)
    return hog.compute(gray).flatten()

def get_color_histogram(img, bins):
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    chans = cv2.split(hsv)
    hists = [cv2.calcHist([c],[0],None,[bins],[0,256]) for c in chans]
    for h in hists: cv2.normalize(h, h)
    return np.hstack([h.flatten() for h in hists])

def get_dense_sift(img, step):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    kps = [cv2.KeyPoint(x, y, step) for y in range(0, gray.shape[0], step) for x in range(0, gray.shape[1], step)]
    _, desc = sift.compute(gray, kps)
    return desc

def get_dense_sift_for_batch(imgs, cfg):
    return [get_dense_sift(im, step=cfg['SIFT_STEP']) for im in imgs]

def create_bovw_histograms(desc_list, vocab):
    k = vocab.n_clusters
    feats = np.zeros((len(desc_list), k), np.float32)
    for i, d in enumerate(desc_list):
        if d is not None:
            words = vocab.predict(d)
            hist, _ = np.histogram(words, bins=np.arange(k+1))
            if hist.sum() > 0: hist = hist / np.linalg.norm(hist)
            feats[i] = hist
    return feats

def create_combined_features(imgs, vocab, cfg, desc_list=None):
    if desc_list is None: desc_list = get_dense_sift_for_batch(imgs, cfg)
    bovw = create_bovw_histograms(desc_list, vocab)
    weighted_bovw = cfg['W_BOVW'] * bovw
    hog_color = []
    for im in imgs:
        h = get_hog_features(im, STD_WINDOW_SIZE, cfg['HOG_CELL'], cfg['HOG_BLOCK'], cfg['HOG_STRIDE'])
        c = get_color_histogram(im, bins=cfg['COLOR_BINS'])
        hog_color.append(np.hstack((cfg['W_HOG']*h, cfg['W_COLOR']*c)))
    return np.hstack((weighted_bovw, np.array(hog_color)))

#Data Loader & Cache Manager
def load_yolo_crops(img_dir, lbl_dir, std_size):
    cropped_img, labels, counts = [], [], defaultdict(int)
    for f in os.listdir(lbl_dir):
        if not f.endswith('.txt'): continue
        img_path = os.path.join(img_dir, os.path.splitext(f)[0] + '.jpg')
        if not os.path.exists(img_path): continue
        img = cv2.imread(img_path)
        if img is None: continue
        h, w, _ = img.shape
        for line in open(os.path.join(lbl_dir, f)):
            try:
                cid, x, y, wn, hn = map(float, line.split())
                cid = int(cid)
                x1, y1 = int((x-wn/2)*w), int((y-hn/2)*h)
                x2, y2 = int((x+wn/2)*w), int((y+hn/2)*h)
                crop = img[max(0,y1):min(h,y2), max(0,x1):min(w,x2)]
                if crop.size>0:
                    cropped_img.append(cv2.resize(crop, std_size))
                    labels.append(cid)
                    counts[cid]+=1
            except: continue
    print(f"Loaded {len(cropped_img)} validation crops.")
    return cropped_img, np.array(labels)

def create_yolo_image(img_dir, lbl_dir, std_size, batch_size):
    files = os.listdir(lbl_dir)
    random.shuffle(files)
    imgs, lbls = [], []
    for f in files:
        if not f.endswith('.txt'): continue
        imgp = os.path.join(img_dir, os.path.splitext(f)[0] + '.jpg')
        if not os.path.exists(imgp): continue
        img = cv2.imread(imgp)
        if img is None: continue
        h,w,_ = img.shape
        for line in open(os.path.join(lbl_dir, f)):
            try:
                cid,x,y,wn,hn = map(float, line.split())
                cid=int(cid)
                x1,y1=int((x-wn/2)*w),int((y-hn/2)*h)
                x2,y2=int((x+wn/2)*w),int((y+hn/2)*h)
                crop=img[max(0,y1):min(h,y2),max(0,x1):min(w,x2)]
                if crop.size>0:
                    imgs.append(cv2.resize(crop,std_size))
                    lbls.append(cid)
                    if len(imgs)>=batch_size:
                        yield np.array(imgs), np.array(lbls)
                        imgs, lbls = [], []
            except: continue
    if imgs: yield np.array(imgs), np.array(lbls)

def get_features(cfg):
    fid = cfg['FEATURE_ID']
    d = os.path.join(BASE_CACHE_DIR, fid)
    os.makedirs(d, exist_ok=True)
    paths = {k: os.path.join(d, f'{k}.npy') for k in ['X_train','y_train','X_val','y_val']}
    vocab_path = os.path.join(d, 'vocab.pkl')

    try:
        if all(os.path.exists(p) for p in paths.values()) and os.path.exists(vocab_path):
            print(f"Cached features found for [{fid}]. Loading...")
            x_train = np.load(paths['X_train'], mmap_mode='r')
            y_train = np.load(paths['y_train'])
            x_val = np.load(paths['X_val'], mmap_mode='r')
            y_val = np.load(paths['y_val'])
            vocab = joblib.load(vocab_path)
            return x_train, y_train, x_val, y_val, vocab
    except Exception as e:
        print(f"Cache error: {e}, regenerating...")

    print(f"Generating features for [{fid}]...")
    start = time.time()
    vocab = MiniBatchKMeans(n_clusters=cfg['K_VOCAB_SIZE'], random_state=42,
                            batch_size=512, n_init=5, max_iter=150)
    for imgs, _ in create_yolo_image(TRAIN_IMG_DIR, TRAIN_LBL_DIR, STD_WINDOW_SIZE, BATCH_SIZE):
        descs = get_dense_sift_for_batch(imgs, cfg)
        flat = np.vstack([d for d in descs if d is not None])
        if flat.size>0: vocab.partial_fit(flat)
    joblib.dump(vocab, vocab_path)
    x_train, y_train = [], []
    for imgs, labels in create_yolo_image(TRAIN_IMG_DIR, TRAIN_LBL_DIR, STD_WINDOW_SIZE, BATCH_SIZE):
        x_train.append(create_combined_features(imgs, vocab, cfg))
        y_train.append(labels)
    x_train, y_train = np.vstack(x_train), np.hstack(y_train)
    x_val_imgs, y_val = load_yolo_crops(VALID_IMG_DIR, VALID_LBL_DIR, STD_WINDOW_SIZE)
    x_val = create_combined_features(x_val_imgs, vocab, cfg)
    for k, v in zip(paths.keys(), [x_train,y_train,x_val,y_val]): np.save(paths[k], v)
    print(f"Features saved ({(time.time()-start)/60:.1f} min)")
    return x_train, y_train, x_val, y_val, vocab

print("All helper functions defined.")

All helper functions defined.


In [4]:
# Extract feature or load from cache 
x_tr, y_tr, x_v, y_v, vocab = get_features(BASE_CONFIG)

Cached features found for [K800_H576_S16_WGT]. Loading...


In [5]:
model_pipeline = make_pipeline(
    StandardScaler(),
    SVC(
        C=BASE_CONFIG['C'],
        kernel=BASE_CONFIG['KERNEL'],
        gamma=BASE_CONFIG['GAMMA'],
        class_weight='balanced',
        random_state=42
    )
)

print(f"Fitting Model: SVC(C={BASE_CONFIG['C']})...")
start_time = time.time()
model_pipeline.fit(x_tr, y_tr) 
print(f"Model fitted in {time.time() - start_time:.2f}s")

Fitting Model: SVC(C=2.0)...
Model fitted in 122.93s


In [6]:
#Save Models
MODEL_DIR = './saved_models'
os.makedirs(MODEL_DIR, exist_ok=True)

#Classifier Pipeline 
model_filename = os.path.join(MODEL_DIR, f"classifier_{BASE_CONFIG['FEATURE_ID']}.pkl")
joblib.dump(model_pipeline, model_filename)

#BoVW  
vocab_source_path = os.path.join(BASE_CACHE_DIR,BASE_CONFIG['FEATURE_ID'], 'vocab.pkl')
vocab_dest_path = os.path.join(MODEL_DIR, f"vocab_{BASE_CONFIG['FEATURE_ID']}.pkl")
shutil.copyfile(vocab_source_path, vocab_dest_path)

print(f"\nClassifier model saved to: {model_filename}")
print(f"BoVW Vocab saved to: {vocab_dest_path}")


Classifier model saved to: ./saved_models/classifier_K800_H576_S16_WGT.pkl
BoVW Vocab saved to: ./saved_models/vocab_K800_H576_S16_WGT.pkl
