# Rotation Forest Implementation (Spambase)
# Based on the paper "Rotation Forest: A New Classifier Ensemble Method"
#
# Dataset: Spambase (UCI)
# - 4,601 samples, 57 numeric features
# - Binary classification: spam (1) vs non-spam (0)
#
# Allowed libs: Python stdlib, numpy, pandas, matplotlib


In [1]:
import os
import time
import urllib.request

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from decision_tree import *
from classification_metrics import *


In [2]:
# ---------------------------
# Logging helper
# ---------------------------

def log_step(msg: str):
    ts = time.strftime('%H:%M:%S')
    print(f"[{ts}] {msg}")


In [3]:
# ---------------------------
# Dataset download + load
# ---------------------------

DATA_PATH = os.path.join('data', 'spambase.data')
SPAMBASE_URL = 'https://archive.ics.uci.edu/ml/machine-learning-databases/spambase/spambase.data'

os.makedirs('data', exist_ok=True)

if not os.path.exists(DATA_PATH):
    log_step(f"Spambase file not found at {DATA_PATH}. Downloading from UCI...")
    urllib.request.urlretrieve(SPAMBASE_URL, DATA_PATH)
    log_step("Download complete")
else:
    log_step(f"Found existing dataset file: {DATA_PATH}")

log_step("Loading Spambase CSV")
df = pd.read_csv(DATA_PATH, header=None)
log_step(f"Loaded df.shape={df.shape}")

# Last column is label: 1=spam, 0=non-spam
X = df.iloc[:, :-1].to_numpy(dtype=float)
y = df.iloc[:, -1].to_numpy(dtype=int)

log_step(f"Prepared X.shape={X.shape}, y.shape={y.shape}")
unique, counts = np.unique(y, return_counts=True)
log_step(f"Class distribution (label->count): {dict(zip(unique.tolist(), counts.tolist()))}")


[00:13:49] Found existing dataset file: data\spambase.data
[00:13:49] Loading Spambase CSV
[00:13:49] Loaded df.shape=(4601, 58)
[00:13:49] Prepared X.shape=(4601, 57), y.shape=(4601,)
[00:13:49] Class distribution (label->count): {0: 2788, 1: 1813}


In [4]:
# ---------------------------
# Train / test split
# ---------------------------

np.random.seed(777)
ind_train = np.random.choice(X.shape[0], size=int(X.shape[0] * 0.8), replace=False)
bool_ind_train = np.isin(np.arange(X.shape[0]), ind_train)

X_train = X[bool_ind_train]
y_train = y[bool_ind_train]
X_test = X[~bool_ind_train]
y_test = y[~bool_ind_train]

log_step(f"Split: X_train={X_train.shape}, X_test={X_test.shape}")


[00:13:53] Split: X_train=(3680, 57), X_test=(921, 57)


# PCA helpers (copied from your PCA notebook style)


In [5]:
def get_mean_std(X):
    return np.mean(X, axis=0), np.std(X, axis=0)


def normalization(X, means=None, sds=None):
    X = X.copy()
    for j in range(X.shape[1]):
        if means is not None:
            X[:, j] = (X[:, j] - means[j])
        if sds is not None:
            X[:, j] = X[:, j] / sds[j]
    return X


def get_principal_components(covariance_matrix):
    # "standard PCA" on covariance matrix
    eigen_values, eigen_vectors = np.linalg.eigh(covariance_matrix)

    order = np.argsort(eigen_values)[::-1]
    eigen_values = eigen_values[order]
    eigen_vectors = eigen_vectors[:, order]

    return eigen_values, eigen_vectors


# Majority voting (same idea as in your RF notebook)


In [6]:
def majority_voting(yHats):
    yHat = []
    for i in range(yHats.shape[1]):
        vals, counts = np.unique(yHats[:, i], return_counts=True)
        yHat.append(int(vals[np.argmax(counts)]))
    return yHat


# Rotation Forest implementation (paper-aligned: train base classifier on the whole transformed training set)


In [7]:
def create_feature_subsets(n_features, M=3):
    """Randomly split features into subsets of size M.

    Paper ref (abstract): "the feature set is randomly split into K subsets".
    Remainder handling: last subset is completed with randomly selected features.
    """
    feature_indices = np.random.permutation(n_features)
    subsets = [feature_indices[i:i+M] for i in range(0, n_features, M)]

    if len(subsets) > 0 and len(subsets[-1]) < M:
        remainder = subsets[-1].tolist()
        needed = M - len(remainder)
        fill = np.random.choice(feature_indices, size=needed, replace=False).tolist()
        remainder.extend(fill)
        subsets[-1] = np.array(remainder)

    return [np.array(s) for s in subsets]


def compute_pca_rotation_for_subset(X_bootstrap, feature_subset):
    X_subset = X_bootstrap[:, feature_subset]

    means, _ = get_mean_std(X_subset)
    X_centered = normalization(X_subset, means=means)

    covariance_matrix = np.cov(X_centered.T)
    _, eigen_vectors = get_principal_components(covariance_matrix)

    # Keep ALL components (rotation, not dimensionality reduction)
    return eigen_vectors, means


def apply_rotation_transform(X, feature_subsets, rotation_matrices, subset_means):
    rotated_blocks = []
    for subset, R, means in zip(feature_subsets, rotation_matrices, subset_means):
        X_subset = X[:, subset]
        X_centered = normalization(X_subset, means=means)
        rotated_blocks.append(X_centered @ R)
    return np.concatenate(rotated_blocks, axis=1)


def rotation_forest(X, y, L=10, M=3, max_depth=15, bootstrap_pca_fraction=0.75, verbose=True):
    n_samples, n_features = X.shape
    n_classes = len(set(y))
    ensemble = []

    if verbose:
        log_step(f"RotationForest train: L={L}, M={M}, n_samples={n_samples}, n_features={n_features}, n_classes={n_classes}")

    for l in range(L):
        if verbose:
            log_step(f"[Tree {l+1}/{L}] Step 1: use full training set (no row bootstrap)")

        X_tree = X
        y_tree = y

        # Step 2: feature subsets
        feature_subsets = create_feature_subsets(n_features, M=M)
        if verbose:
            log_step(f"[Tree {l+1}/{L}] Step 2: K={len(feature_subsets)}, subset_sizes={[len(s) for s in feature_subsets]}")

        # Step 3: PCA sample (fraction, no replacement)
        pca_sample_size = int(n_samples * bootstrap_pca_fraction)
        pca_indices = np.random.choice(n_samples, size=pca_sample_size, replace=False)
        X_pca_bootstrap = X[pca_indices]
        if verbose:
            log_step(f"[Tree {l+1}/{L}] Step 3: X_pca_bootstrap={X_pca_bootstrap.shape}")

        # Step 4: PCA per subset
        rotation_matrices = []
        subset_means = []
        for si, subset in enumerate(feature_subsets):
            R, means = compute_pca_rotation_for_subset(X_pca_bootstrap, subset)
            rotation_matrices.append(R)
            subset_means.append(means)
            if verbose and (si < 2 or si == len(feature_subsets) - 1):
                log_step(f"[Tree {l+1}/{L}] subset {si+1}/{len(feature_subsets)}: R.shape={R.shape}")

        # Step 5: rotate full training set
        X_tree_rotated = apply_rotation_transform(X_tree, feature_subsets, rotation_matrices, subset_means)
        if verbose:
            log_step(f"[Tree {l+1}/{L}] Step 5: X_tree_rotated={X_tree_rotated.shape}")

        # Step 6: train base classifier (decision tree)
        tree = build_tree(X_tree_rotated, y_tree, n_classes, max_depth=max_depth, max_features=None)

        ensemble.append({
            'tree': tree,
            'feature_subsets': feature_subsets,
            'rotation_matrices': rotation_matrices,
            'subset_means': subset_means,
        })

        if verbose:
            log_step(f"[Tree {l+1}/{L}] done")

    return ensemble


def rotation_forest_predict(ensemble, X, verbose=True):
    L = len(ensemble)
    yHats = np.zeros((L, X.shape[0]))

    if verbose:
        log_step(f"RotationForest predict: L={L}, X={X.shape}")

    for l in range(L):
        if verbose:
            log_step(f"[Predict {l+1}/{L}] rotate -> tree predict")

        X_rotated = apply_rotation_transform(
            X,
            ensemble[l]['feature_subsets'],
            ensemble[l]['rotation_matrices'],
            ensemble[l]['subset_means'],
        )
        yHats[l, :] = predict(ensemble[l]['tree'], X_rotated)

    return majority_voting(yHats)


In [8]:
    # ---------------------------
# Train + evaluate Rotation Forest
# ---------------------------

np.random.seed(777)
log_step("Training Rotation Forest...")
rot_ens = rotation_forest(X_train, y_train, L=10, M=3, max_depth=15, bootstrap_pca_fraction=0.75, verbose=True)
log_step(f"Trained Rotation Forest: L={len(rot_ens)}")

log_step("Evaluating Rotation Forest...")
yhat_rot = rotation_forest_predict(rot_ens, X_test, verbose=True)
_, cm_rot = confusion_matrix(y_test, yhat_rot)
acc_rot = float(accuracy(cm_rot))
log_step(f"Rotation Forest accuracy={acc_rot:.4f}")
print("Confusion matrix:\n", cm_rot)


[00:14:18] Training Rotation Forest...
[00:14:18] RotationForest train: L=10, M=3, n_samples=3680, n_features=57, n_classes=2
[00:14:18] [Tree 1/10] Step 1: use full training set (no row bootstrap)
[00:14:18] [Tree 1/10] Step 2: K=19, subset_sizes=[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
[00:14:18] [Tree 1/10] Step 3: X_pca_bootstrap=(2760, 57)
[00:14:18] [Tree 1/10] subset 1/19: R.shape=(3, 3)
[00:14:18] [Tree 1/10] subset 2/19: R.shape=(3, 3)
[00:14:18] [Tree 1/10] subset 19/19: R.shape=(3, 3)
[00:14:18] [Tree 1/10] Step 5: X_tree_rotated=(3680, 57)
[00:14:37] [Tree 1/10] done
[00:14:37] [Tree 2/10] Step 1: use full training set (no row bootstrap)
[00:14:37] [Tree 2/10] Step 2: K=19, subset_sizes=[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
[00:14:37] [Tree 2/10] Step 3: X_pca_bootstrap=(2760, 57)
[00:14:37] [Tree 2/10] subset 1/19: R.shape=(3, 3)
[00:14:37] [Tree 2/10] subset 2/19: R.shape=(3, 3)
[00:14:37] [Tree 2/10] subset 19/19: R.shape=(3, 3)
[00:1

In [None]:
# ---------------------------
# Baseline: Random Forest (from your earlier notebook style)
# ---------------------------

def random_forest(X, y, K, max_depth=15):
    decision_trees = []
    for k in range(K):
        ind = np.random.choice(X.shape[0], size=X.shape[0], replace=True)
        X_sample = X[ind]
        y_sample = y[ind]
        decision_trees.append(build_tree(X_sample, y_sample, len(set(y_sample)), max_features="sqrt", max_depth=max_depth))
    return decision_trees


def random_forest_predict(decision_trees, X):
    K = len(decision_trees)
    yHats = np.zeros((K, X.shape[0]))
    for k in range(K):
        yHats[k, :] = predict(decision_trees[k], X)
    return majority_voting(yHats)

np.random.seed(777)
log_step("Training Random Forest baseline...")
rf_ens = random_forest(X_train, y_train, K=10, max_depth=15)

log_step("Evaluating Random Forest baseline...")
yhat_rf = random_forest_predict(rf_ens, X_test)
_, cm_rf = confusion_matrix(y_test, yhat_rf)
acc_rf = float(accuracy(cm_rf))
log_step(f"Random Forest accuracy={acc_rf:.4f}")
print("Confusion matrix:\n", cm_rf)


In [None]:
print("=" * 50)
print("PERFORMANCE COMPARISON")
print("=" * 50)
print(f"Rotation Forest accuracy: {acc_rot:.4f}")
print(f"Random Forest accuracy:   {acc_rf:.4f}")
print(f"Improvement:              {(acc_rot - acc_rf):.4f}")
print("=" * 50)
