## Settings

In [1]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import time
import tracemalloc
import random
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score , recall_score

# Path to the dataset
train_path = './assets/train/'
test_path = './assets/test/'

# Feature extractor (SIFT)
sift = cv2.SIFT_create(contrastThreshold=0.01)

### Create Descriptors

In [2]:
def extract_descriptors_from_image(image_path):
    """
    Extract exactly a specified number of descriptors from an image using SIFT.
    """
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if image is None:
        raise ValueError(f"Image not found or invalid: {image_path}")
    
    keypoints, descriptors = sift.detectAndCompute(image, None)
    
    if descriptors is None or len(descriptors) == 0:
        raise ValueError(f"No descriptors found in image: {image_path}")

    return descriptors

In [3]:
def collect_descriptors_all(data_path):
    """
    Collect descriptors and their corresponding labels.
    """
    all_descriptors = []
    all_labels = []
    
    classes = [d for d in os.listdir(data_path) if os.path.isdir(os.path.join(data_path, d))]
    
    num_des_tot = 0
    num_des_class_dist = []
    for i, class_name in enumerate(classes):
        class_folder = os.path.join(data_path, class_name)
        # print(f"Processing class: {class_name}")
        
        num_des_class = 0
        for image_name in os.listdir(class_folder):
            if image_name.lower().endswith(('png', 'jpg', 'jpeg')):
                image_path = os.path.join(class_folder, image_name)
                try:
                    # Extract descriptors for this image
                    descriptors = extract_descriptors_from_image(image_path)
                    
                    # Append descriptors and corresponding labels
                    all_descriptors += descriptors.tolist()
                    all_labels += [i]*len(descriptors)
                    num_des_class += len(descriptors)
                except ValueError as e:
                    print(f"Error processing {image_name}: {e}")
        num_des_tot += num_des_class
        num_des_class_dist.append(num_des_class)
    print(f'total number of descriptors: {num_des_tot}')
    print(f'number of descriptors per class: {num_des_class_dist}')
    
    # Convert to NumPy arrays
    all_descriptors = np.array(all_descriptors)
    all_labels = np.array(all_labels)
    
    return all_descriptors, all_labels

In [4]:
def collect_descriptors_with_labels(data_path):
    """
    Collect descriptors and their corresponding labels.
    """
    all_descriptors = []
    all_labels = []
    
    classes = [d for d in os.listdir(data_path) if os.path.isdir(os.path.join(data_path, d))]

    num_des_tot = 0
    num_des_class_dist = []
    for i, class_name in enumerate(classes):
        class_folder = os.path.join(data_path, class_name)
        # print(f"Processing class: {class_name}")
        
        for image_name in os.listdir(class_folder):
            if image_name.lower().endswith(('png', 'jpg', 'jpeg')):
                image_path = os.path.join(class_folder, image_name)
                try:
                    # Extract descriptors for this image
                    descriptors = extract_descriptors_from_image(image_path)
                    all_descriptors.append(descriptors.tolist())
                    all_labels.append(i)
                except ValueError as e:
                    print(f"Error processing {image_name}: {e}")
    
    # Convert to NumPy arrays
    all_labels = np.array(all_labels)
    
    return all_descriptors, all_labels

In [5]:
# Extract descriptors and labels
descriptors_pool, labels_pool = collect_descriptors_all(train_path)
print("Descriptor extraction completed.")

total number of descriptors: 104008
number of descriptors per class: [13465, 15119, 5421, 8680, 10758, 14686, 18524, 8025, 2550, 6780]
Descriptor extraction completed.


# Q3. RF codebook

## Construct RF codebook

In [6]:
n_estimators = 5
max_depth = 4

max_leaf_nodes = np.pow(2, max_depth)
codebook_size = np.pow(2, max_depth) * n_estimators

rf_code = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, bootstrap=True, random_state=None, max_samples=0.9, max_features="sqrt", criterion='entropy')
rf_code.fit(descriptors_pool, labels_pool.ravel())

leaf_indices = []
for tree in rf_code.estimators_:
    # Mask for leaf nodes
    is_leaf = tree.tree_.children_left == -1
    # Collect only leaf node indices
    leaf_indices.append(np.where(is_leaf)[0])
leaf_indices = np.array(leaf_indices)

In [7]:
# 하나의 이미지에 대한 bag of words를 구하는 거!
def create_rf_histogram(descriptors, rf_code, leaf_indices, max_leaf_nodes, ensemble='concat'):
    # Step 1: Apply descriptors to the forest to get leaf indices
    # 여기에서는 index가 전체 node에 대한 index
    des_indices = rf_code.apply(descriptors)  # Shape: (n_descriptors, n_trees)

    # Step 2: Convert node index to leaf index
    converted_indices = np.zeros_like(des_indices)
    for tree_idx in range(des_indices.shape[1]):  # Loop over each tree
        leaf_map = {leaf: idx for idx, leaf in enumerate(leaf_indices[tree_idx])}
        converted_indices[:, tree_idx] = [leaf_map[val] for val in des_indices[:, tree_idx]]

    # Step 3: one-hot encoding and concatenate
    one_hot_encoded = []
    for row in converted_indices:
        row_one_hot = []
        # tree 마다 one-hot encoding을 하나씩 만들어서 concatenate
        for leaf_idx in row:
            # Generate a one-hot vector of size max_leaf_nodes
            one_hot_vector = np.zeros(max_leaf_nodes)
            one_hot_vector[leaf_idx] = 1
            row_one_hot.append(one_hot_vector)
        # Concatenate the one-hot vectors for each tree
        if ensemble == 'sum':
            one_hot_encoded.append(np.sum(row_one_hot, axis=0))
        else:
            one_hot_encoded.append(np.concatenate(row_one_hot))
    one_hot_encoded = np.array(one_hot_encoded)
    
    # Step 4: Normalized histogram -> image 마다 descriptors 개수가 달라서 normalize 해야 함
    histogram = one_hot_encoded.sum(axis=0) / one_hot_encoded.sum()
    # histogram = one_hot_encoded.sum(axis=0)

    return histogram

## RF classification

### Prepare dataset

In [8]:
# Extract train descriptors and labels
train_descriptors, train_labels = collect_descriptors_with_labels(train_path)
print("Train descriptor extraction completed.")
ensemble = 'concat'

train_bow_rf = []
for descriptors in train_descriptors:
    image_bow = create_rf_histogram(descriptors, rf_code, leaf_indices, max_leaf_nodes, ensemble=ensemble)
    train_bow_rf.append(image_bow)

train_bow_rf = np.array(train_bow_rf)

x_train = train_bow_rf
y_train = train_labels

Train descriptor extraction completed.


In [9]:
x_train.shape

(150, 80)

In [10]:
# Extract test descriptors and labels
test_descriptors, test_labels = collect_descriptors_with_labels(test_path)
print("Descriptor extraction completed.")

test_bow_rf = []
for descriptors in test_descriptors:
    image_bow = create_rf_histogram(descriptors, rf_code, leaf_indices, max_leaf_nodes, ensemble=ensemble)
    test_bow_rf.append(image_bow)

test_bow_rf = np.array(test_bow_rf)

x_test = test_bow_rf
y_test = test_labels

Descriptor extraction completed.


### Axis-aligned test

In [11]:
def RF_classification(x_train, y_train, x_test, n_estimators=30, max_depth=10, bootstrap=True, random_state=None, max_samples=0.7, max_features="sqrt", criterion='entropy'):
    rf_clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, bootstrap=bootstrap, random_state=random_state, max_samples=max_samples, max_features=max_features, criterion=criterion)
    # Train ----------------------------------------------------------
    tracemalloc.start() 
    start_time = time.perf_counter()
    
    rf_clf.fit(x_train, y_train.ravel())
    
    train_time = time.perf_counter() - start_time
    current, train_peak_memory = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    y_train_pred = rf_clf.predict(x_train)

    # Test ----------------------------------------------------------
    tracemalloc.start() 
    start_time = time.perf_counter()

    y_test_pred = rf_clf.predict(x_test)

    test_time = time.perf_counter()- start_time
    current, test_peak_memory = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    # Retrieve the maximum depth of each tree in the forest
    tree_depths = [estimator.tree_.max_depth for estimator in rf_clf.estimators_]
    max_tree_depth = max(tree_depths)

    return y_train_pred, y_test_pred, train_time, test_time, train_peak_memory, test_peak_memory, max_tree_depth

In [12]:
y_train_pred, y_test_pred, train_time, test_time, train_peak_memory, test_peak_memory, max_tree_depth = RF_classification(x_train, y_train, x_test, n_estimators=30, max_depth=5, bootstrap=True, random_state=None, max_samples=0.7, max_features="sqrt", criterion='entropy')

train_accuracy = accuracy_score(y_train.T, y_train_pred)
test_accuracy = accuracy_score(y_test.T, y_test_pred)
test_precision = precision_score(y_test.T, y_test_pred, average= "macro", zero_division=0)
test_recall = recall_score(y_test.T, y_test_pred, average= "macro", zero_division=0)

print(f'train time: {train_time}')
print(f'train accuracy: {train_accuracy}')
print('\n')

print(f'test time: {test_time}')
print(f'test_accuracy: {test_accuracy}')
print('\n')

print(f'test_precision: {test_precision}')
print(f'test_recall: {test_recall}')
print('\n')

print(f'max tree depth: {max_tree_depth}')

train time: 0.17165810003643855
train accuracy: 0.9866666666666667


test time: 0.0018428000039421022
test_accuracy: 0.42


test_precision: 0.45734997369032904
test_recall: 0.42000000000000004


max tree depth: 5


### Two-pixel test

In [13]:
def create_two_pixel_features(x_train, x_test, n_pairs=None, random_seed=None):
    x_combined = np.concatenate([x_train, x_test], axis=1)

    n_features, n_samples = x_combined.shape

    # Set random seed if provided
    if random_seed is not None:
        random.seed(random_seed)

    # Generate all unique pairs of features where i != j
    feature_pairs = [(i, j) for i in range(n_features) for j in range(i + 1, n_features)]

    # If n_pairs is specified, randomly select a subset of feature pairs
    if n_pairs is not None and n_pairs < len(feature_pairs):
        feature_pairs = random.sample(feature_pairs, n_pairs)
    
    # Initialize a new features matrix for pairwise differences
    new_features = np.zeros((len(feature_pairs), n_samples))

    # Fill in the new features with pairwise differences
    for idx, (i, j) in enumerate(feature_pairs):
        new_features[idx, :] = x_combined[i, :] - x_combined[j, :]

    x_train_2pix = new_features[:, :x_train.shape[1]]
    x_test_2pix = new_features[:, x_train.shape[1]:]

    return x_train_2pix.T, x_test_2pix.T

In [14]:
n_pairs = x_train.shape[1]
# n_pairs = 100
x_train_2pix, x_test_2pix = create_two_pixel_features(x_train.T, x_test.T, n_pairs=n_pairs, random_seed=0)
y_train_pred, y_test_pred, train_time, test_time, train_peak_memory, test_peak_memory, max_tree_depth = RF_classification(x_train_2pix, y_train, x_test_2pix, n_estimators=30, max_depth=10, bootstrap=True, random_state=None, max_samples=0.7, max_features="sqrt", criterion='entropy')

train_accuracy = accuracy_score(y_train.T, y_train_pred)
test_accuracy = accuracy_score(y_test.T, y_test_pred)
test_precision = precision_score(y_test.T, y_test_pred, average= "macro", zero_division=0)
test_recall = recall_score(y_test.T, y_test_pred, average= "macro", zero_division=0)

print(f'train time: {train_time}')
print(f'train accuracy: {train_accuracy}')
print('\n')

print(f'test time: {test_time}')
print(f'test_accuracy: {test_accuracy}')
print('\n')

print(f'test_precision: {test_precision}')
print(f'test_recall: {test_recall}')
print('\n')

print(f'max tree depth: {max_tree_depth}')

train time: 0.1733814000035636
train accuracy: 0.9933333333333333


test time: 0.0018493999959900975
test_accuracy: 0.3333333333333333


test_precision: 0.38338143549814035
test_recall: 0.33333333333333337


max tree depth: 10
