In [1]:
import numpy as np
from sklearn.decomposition import PCA
from sklearn.svm import LinearSVC
from sklearn.metrics import accuracy_score
from torchvision.datasets import MNIST
from torchvision import transforms
from scipy.signal import convolve2d
import warnings
import cv2
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

In [2]:
warnings.filterwarnings('ignore')  # Biarkan output bersih

# Load subset MNIST
def load_mnist(max_train=3000, max_test=500):
    mnist = fetch_openml('mnist_784', version=1, as_frame=False)
    X = mnist.data.reshape(-1, 28, 28).astype(np.float32)
    y = mnist.target.astype(int)

    # Split dulu sebagian untuk test
    X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=max_test, stratify=y, random_state=42)
    # Lalu ambil max_train dari sisanya
    X_train, _, y_train, _ = train_test_split(X_temp, y_temp, train_size=max_train, stratify=y_temp, random_state=42)

    return X_train, y_train, X_test, y_test

In [3]:
# Patch extraction acak (hemat memori)
def extract_random_patches(images, patch_size=7, stride=1, max_patches=50000):
    n, h, w = images.shape
    k = patch_size
    patch_vectors = []

    for idx in range(n):
        for i in range(0, h - k + 1, stride):
            for j in range(0, w - k + 1, stride):
                patch = images[idx, i:i + k, j:j + k]
                patch = patch - np.mean(patch)
                patch_vectors.append(patch.flatten())
                if len(patch_vectors) >= max_patches:
                    return np.array(patch_vectors).T
    return np.array(patch_vectors).T

In [4]:
# PCA filter
def learn_pca_filters(patches, num_filters, patch_size):
    print(f"Learning PCA filters with shape {patches.shape}...")
    pca = PCA(n_components=num_filters)
    pca.fit(patches.T)
    filters = pca.components_.reshape((num_filters, patch_size, patch_size))
    return filters

In [5]:
# Convolve images
def convolve_images(images, filters):
    feature_maps = []
    for f in filters:
        fmaps = [convolve2d(img, f, mode='valid') for img in images]
        feature_maps.append(np.stack(fmaps))
    return feature_maps  # List of arrays shape (n, h', w')

In [6]:
# Binary hashing
def binary_hashing(feature_maps):
    bin_stack = np.stack([(fm > 0).astype(np.uint8) for fm in feature_maps], axis=0)
    powers = 2 ** np.arange(bin_stack.shape[0])[::-1].reshape((-1, 1, 1, 1))
    hashed = np.sum(bin_stack * powers, axis=0)
    return hashed

In [7]:
# Histogram encoding
def block_histogram(images, block_size=(7, 7), num_bins=256, overlap=0.5):
    n, h, w = images.shape
    bh, bw = block_size
    step_h = int(bh * (1 - overlap))
    step_w = int(bw * (1 - overlap))

    features = []
    for img in images:
        blocks = []
        for i in range(0, h - bh + 1, step_h):
            for j in range(0, w - bw + 1, step_w):
                block = img[i:i + bh, j:j + bw]
                hist, _ = np.histogram(block, bins=num_bins, range=(0, num_bins))
                blocks.extend(hist)
        features.append(np.array(blocks))
    return np.stack(features)

In [8]:
# Main pipeline
def run_pcn_mnist():
    print("🔍 Loading MNIST...")
    X_train, y_train, X_test, y_test = load_mnist(max_train=3000, max_test=500)

    print("🧩 Extracting stage-1 patches...")
    patches1 = extract_random_patches(X_train, patch_size=7, stride=1, max_patches=50000)

    print("🔧 Training stage-1 PCA filters...")
    filters1 = learn_pca_filters(patches1, num_filters=6, patch_size=7)

    print("🎛 Convolution stage-1...")
    # fmap1 will be a list of arrays, each array is shape (n, h', w') for one filter
    fmap1 = convolve_images(X_train, filters1)

    print("🧠 Extracting stage-2 patches (subset only)...")
    # Need patches from stage 1 feature maps across images and filters
    # fmap1 is a list of (n, h', w') arrays. Stack them to (num_filters1, n, h', w') then reshape to (num_filters1*n, h', w')
    fmap1_stacked_for_patches = np.concatenate(fmap1, axis=0) # shape (num_filters1 * n, h', w')
    fmap1_stacked_for_patches = fmap1_stacked_for_patches[:3000] # Take a subset
    patches2 = extract_random_patches(fmap1_stacked_for_patches, patch_size=7, stride=1, max_patches=50000)


    print("🔧 Training stage-2 PCA filters...")
    filters2 = learn_pca_filters(patches2, num_filters=11, patch_size=7)

    print("🎛 Convolution stage-2...")
    fmap2 = []
    # fmap1 is a list of arrays, each shape (n, h', w')
    # We need to iterate through images (n) and for each image,
    # combine its feature maps from stage 1 (across filters1)
    # and convolve with filters2.

    # Reshape fmap1 from list of (n, h', w') to array (n, num_filters1, h', w')
    fmap1_stacked = np.stack(fmap1, axis=1) # shape (n, num_filters1, h', w')

    for i in range(fmap1_stacked.shape[0]): # Iterate through images (n)
        # For each image, combine feature maps from different filters1
        img_fmaps1_combined = fmap1_stacked[i].sum(axis=0) # shape (h', w')
        # Convolve the combined map for this image with each filter2
        conv_maps = [cv2.filter2D(img_fmaps1_combined, -1, f) for f in filters2]
        fmap2.append(np.stack(conv_maps)) # shape (num_filters2, h'', w'')
    fmap2 = np.array(fmap2) # Final shape (n, num_filters2, h'', w'')

    # The binary hashing and histogram functions expect input with samples as the first dimension.
    # binary_hashing is currently implemented to hash along the *first* dimension.
    # To hash across filters (axis=1 in fmap2's final shape), we need to transpose
    # the input for binary_hashing.
    fmap2_hashed_input = np.transpose(fmap2, (1, 0, 2, 3)) # shape (num_filters2, n, h'', w'')

    print("🔐 Binary hashing + histogram (train)...")
    # Pass the correctly shaped input to binary_hashing
    hashed_train = binary_hashing(fmap2_hashed_input) # shape (n, h'', w'')
    features_train = block_histogram(hashed_train, block_size=(7, 7), overlap=0.5)


    print("🧪 Applying model to test set...")
    fmap1_test = convolve_images(X_test, filters1)

    # Apply the same logic as the training set convolution for stage 2
    fmap1_test_stacked = np.stack(fmap1_test, axis=1) # shape (n_test, num_filters1, h', w')

    fmap2_test = []
    for i in range(fmap1_test_stacked.shape[0]): # Iterate through test images (n_test)
        img_fmaps1_test_combined = fmap1_test_stacked[i].sum(axis=0) # shape (h', w')
        conv_maps_test = [cv2.filter2D(img_fmaps1_test_combined, -1, f) for f in filters2]
        fmap2_test.append(np.stack(conv_maps_test))
    fmap2_test = np.array(fmap2_test) # Final shape (n_test, num_filters2, h'', w'')

    # Transpose for binary hashing
    fmap2_test_hashed_input = np.transpose(fmap2_test, (1, 0, 2, 3)) # shape (num_filters2, n_test, h'', w'')

    hashed_test = binary_hashing(fmap2_test_hashed_input) # shape (n_test, h'', w'')
    features_test = block_histogram(hashed_test, block_size=(7, 7), overlap=0.5)

    print("🏷 Training Linear SVM...")
    clf = LinearSVC(max_iter=3000)
    # features_train should now have shape (n, num_features_per_image) where n is 3000
    # features_test should have shape (n_test, num_features_per_image) where n_test is 500
    # y_train has shape (3000,) and y_test has shape (500,)
    # This should resolve the inconsistent sample error.
    clf.fit(features_train, y_train)
    y_pred = clf.predict(features_test)
    acc = accuracy_score(y_test, y_pred)
    print(f"✅ Final Test Accuracy: {acc * 100:.2f}%")

run_pcn_mnist()

🔍 Loading MNIST...
🧩 Extracting stage-1 patches...
🔧 Training stage-1 PCA filters...
Learning PCA filters with shape (49, 50000)...
🎛 Convolution stage-1...
🧠 Extracting stage-2 patches (subset only)...
🔧 Training stage-2 PCA filters...
Learning PCA filters with shape (49, 50000)...
🎛 Convolution stage-2...
🔐 Binary hashing + histogram (train)...
🧪 Applying model to test set...
🏷 Training Linear SVM...
✅ Final Test Accuracy: 92.40%
