In [None]:
import os, random, pickle, joblib, time, gc, zipfile
import numpy as np
import cv2
from skimage.feature import local_binary_pattern
from torchvision.datasets import ImageFolder
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import LabelEncoder
from google.colab import drive

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support, confusion_matrix,
    classification_report, ConfusionMatrixDisplay)

from sklearn.ensemble import RandomForestClassifier

In [None]:
DATA_ROOT = "/content"
SAVE_PATH = "/content/drive/MyDrive/COMP6721/scene_bovw_lbp_features_NF200_V600.npz"

#  (VOCAB_SIZE=600, ORB_NFEATURES=200, LBP_Radii=[1,2])
ORB_NFEATURES = 200    #300            # max keypoints per image
VOCAB_SIZE   = 600     #100            # visual words
LBP_RADIUS   = [1, 2]                  # (P = 8*R) uniform LBP

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
zip_path = "/content/drive/My Drive/dataset/Comp6721_Project_Dataset.zip"
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall('/content/')

In [None]:
train_dataset_full = ImageFolder(os.path.join(DATA_ROOT, "Training"))
test_dataset       = ImageFolder(os.path.join(DATA_ROOT, "Test"))

train_paths = [p for p, _ in train_dataset_full.samples]
train_lbls  = [lbl for _, lbl in train_dataset_full.samples]

test_paths  = [p for p, _ in test_dataset.samples]
test_lbls   = [lbl for _, lbl in test_dataset.samples]

classes = train_dataset_full.classes
print("Classes:", classes)
print(f"Number of training images: {len(train_dataset_full)}")
print(f"Number of test images: {len(test_dataset)}")

Classes: ['library-indoor', 'museum-indoor', 'shopping_mall-indoor']
Number of training images: 15000
Number of test images: 300


In [None]:
idxs = np.arange(len(train_paths))
train_idx, val_idx = train_test_split(
    idxs, test_size=0.10, stratify=train_lbls, random_state=42)

train_paths_split = [train_paths[i] for i in train_idx]
train_lbls_split  = [train_lbls[i]  for i in train_idx]
val_paths_split   = [train_paths[i] for i in val_idx]
val_lbls_split    = [train_lbls[i]  for i in val_idx]

print(f"Train images: {len(train_paths_split)},  Val images: {len(val_paths_split)}")

# LabelEncoder → 0,1,2
le = LabelEncoder().fit(train_lbls)
y_train = le.transform(train_lbls_split)
y_val   = le.transform(val_lbls_split)
y_test  = le.transform(test_lbls)

Train images: 13500,  Val images: 1500


### Learn 600-word ORB vocabulary  

* Streams ORB descriptors from the **train** split  
* Fits `MiniBatchKMeans` (partial-fit batches of 10 k descriptors)

In [None]:
#  Build ORB → MiniBatchKMeans vocabulary on train imgs
orb = cv2.ORB_create(nfeatures=ORB_NFEATURES)
kmeans = MiniBatchKMeans(n_clusters=VOCAB_SIZE,
                         batch_size=10_000,     # descriptors / partial_fit
                         random_state=42,
                         verbose=0)

def stream_orb_descriptors(paths, batch=500):
    """Yield lists of descriptors in chunks to avoid RAM blow-up."""
    buff = []
    for i, p in enumerate(paths):
        img = cv2.imread(p, cv2.IMREAD_GRAYSCALE)
        if img is None:
            continue
        _, des = orb.detectAndCompute(img, None)
        if des is not None:
            if des.shape[0] > ORB_NFEATURES:
                des = des[:ORB_NFEATURES]
            buff.append(des.astype(np.float32))
        if (i + 1) % batch == 0 and buff:
            yield np.vstack(buff)
            buff = []
    if buff:
        yield np.vstack(buff)

print("Fitting MiniBatchKMeans vocabulary …")
t0 = time.time()
for desc_batch in stream_orb_descriptors(train_paths_split):
    kmeans.partial_fit(desc_batch)
print(f"Done in {time.time()-t0:.1f}s")

Fitting MiniBatchKMeans vocabulary …
Done in 34.6s


### Extract BoVW (600) + LBP (28) descriptors  

* **BoVW:** assign each ORB descriptor to nearest visual word → 600-bin hist  
* **LBP:** uniform patterns, radii 1 & 2 → 10 + 18 bins  
* Concatenate → 628-dim feature per image  
* Save train / val / test matrices → `scene_bovw_lbp_features_NF200_V600.npz`

In [None]:
# extract BoVW + LBP
n_lbp_bins = sum((r * 8 + 2) for r in LBP_RADIUS)

def extract_features(path):
    # read + grayscale
    img = cv2.imread(path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # ORB → visual words hist
    kp, des = orb.detectAndCompute(gray, None)
    bovw_hist = np.zeros(VOCAB_SIZE, dtype=np.float32)
    if des is not None and des.size:
        if des.shape[0] > ORB_NFEATURES:
            des = des[:ORB_NFEATURES]
        words = kmeans.predict(des.astype(np.float32))
        bovw_hist, _ = np.histogram(words, bins=np.arange(VOCAB_SIZE+1))
        if bovw_hist.sum():
            bovw_hist = bovw_hist / bovw_hist.sum()

    # LBP hist
    lbp_hists = []
    # Iterate through each radius in LBP_RADIUS
    for r in LBP_RADIUS:
        # Calculate P for the current radius
        p = r * 8
        lbp = local_binary_pattern(gray, P=p, R=r, method='uniform')
        # Calculate histogram for the current LBP result
        max_bin = p + 2 # Uniform LBP has P+2 possible values (P + 2 for non-uniform)
        hist, _ = np.histogram(lbp.ravel(), bins=max_bin, range=(0, max_bin))
        if hist.sum():
             hist = hist / hist.sum()
        lbp_hists.append(hist)

    # Concatenate the LBP histograms from different radii
    lbp_hist = np.concatenate(lbp_hists)

    # concat
    # Recalculate FEAT_LEN based on the new LBP histogram length
    feat_len = VOCAB_SIZE + len(lbp_hist)
    return np.concatenate([bovw_hist, lbp_hist]).astype(np.float32)

# Re-calculate FEAT_LEN based on the individual LBP histograms lengths
# Need to call extract_features once to get the correct length of the concatenated LBP hist
temp_features = extract_features(train_paths_split[0])
FEAT_LEN = len(temp_features)

print("Final feature length:", FEAT_LEN)

Final feature length: 628


In [None]:
def build_matrix(paths):
    X = np.zeros((len(paths), FEAT_LEN), dtype=np.float32)
    for i, p in enumerate(paths):
        X[i] = extract_features(p)
    return X

X_train = build_matrix(train_paths_split)
print("Training feature matrix built.")
X_val   = build_matrix(val_paths_split)
print("Validation feature matrix built.")
X_test  = build_matrix(test_paths)
print("Test feature matrix built.")

# Free a bit of RAM
gc.collect();

Training feature matrix built.
Validation feature matrix built.
Test feature matrix built.


In [None]:
print("Feature shapes:", X_train.shape, X_val.shape, X_test.shape)

Feature shapes: (13500, 628) (1500, 628) (300, 628)


In [None]:
#Save everything to Google Drive for quick reload
np.savez_compressed(
    SAVE_PATH,
    X_train=X_train, y_train=y_train,
    X_val=X_val,     y_val=y_val,
    X_test=X_test,   y_test=y_test,
    classes=np.array(classes)
)
print(f"Saved to {SAVE_PATH}")

Saved to /content/drive/MyDrive/COMP6721/scene_bovw_lbp_features_NF200_V600.npz


In [None]:
joblib.dump(kmeans,"/content/drive/MyDrive/COMP6721/models/vocab_kmeans_600.joblib")

['/content/drive/MyDrive/COMP6721/models/vocab_kmeans_600.joblib']

In [None]:
"""
# Data Re-loading
data = np.load(SAVE_PATH, allow_pickle=True)
X_train, y_train = data['X_train'], data['y_train']
X_val,   y_val   = data['X_val'],   data['y_val']
X_test,  y_test  = data['X_test'],  data['y_test']
classes = data['classes']
"""

### Single-Run Random Forest

In [None]:
rf = RandomForestClassifier(max_depth=20,
                            min_samples_leaf=10,
                            max_features='sqrt',
                            n_estimators=200,
                            bootstrap=True,
                            max_samples=0.7,
                            random_state=42,
                            n_jobs=-1)
print(f"leaf={10} → VAL = {rf.fit(X_train, y_train).score(X_val, y_val):.3f}")

leaf=10 → VAL = 0.597
