# Fitting SVM to homological representation

This notebook follows the Lecture 16 idea: compute a homological representation of data and fit an SVM with a precomputed kernel.

We will:
- Load STFT domain-frequency data from `stft_np/` using the provided `meta.json` and memmaps.
- For each sample, build a 2D point cloud from frequency vs magnitude and compute persistence diagrams (ripser).
- Convert diagrams to barcodes and then to stable rank functions using the provided `stablerank` utilities.
- Build a kernel matrix using Pcf/Pcnif dot-products and fit `sklearn.svm.SVC(kernel='precomputed')`.


In [9]:
import os, json
import numpy as np
from tqdm import tqdm

# Data locations
ROOT = "/Users/theodorbjork/SF2956"
STFT_DIR = os.path.join(ROOT, "stft_np")
META_FP = os.path.join(STFT_DIR, "meta.json")

# Load metadata and memmaps
with open(META_FP, "r") as f:
    meta = json.load(f)

A = np.memmap(os.path.join(STFT_DIR, meta["A_memmap"]), dtype=np.float16, mode="r", shape=tuple(meta["A_shape"]))
B = np.memmap(os.path.join(STFT_DIR, meta["B_memmap"]), dtype=np.float16, mode="r", shape=tuple(meta["B_shape"]))
freqs = np.load(os.path.join(STFT_DIR, meta["freqs_npy"]))

A.shape, B.shape, freqs.shape


((940, 1000, 257), (973, 1000, 257), (257,))

In [10]:
# Build labels and a light subset to start (keep runtime manageable)
# Using first N from each class
N_PER_CLASS = 80

XA = A[:N_PER_CLASS].astype(np.float32)
XB = B[:N_PER_CLASS].astype(np.float32)
X = np.concatenate([XA, XB], axis=0)   # (n, frames, bins)
y = np.concatenate([np.ones(len(XA), dtype=np.int64), np.zeros(len(XB), dtype=np.int64)])

X.shape, y.shape, np.bincount(y)


((160, 1000, 257), (160,), array([80, 80]))

In [11]:
# Compatibility aliases (some cells may reference X_small/y_small)
X_small = X
y_small = y
print(X_small.shape, y_small.shape, np.bincount(y_small))


(160, 1000, 257) (160,) [80 80]


In [12]:
# Homological representation utilities
# We'll compute a simple persistence on a downsampled slice per sample.

!pip -q install ripser scikit-learn

from ripser import ripser
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
import numpy as np

# Convert a 2D time-frequency patch into a point cloud in R^2
# We use (frequency, magnitude) pairs sampled over time window(s)

def stft_patch_to_points(stft_patch: np.ndarray, freqs: np.ndarray, time_stride: int = 20, freq_stride: int = 4, top_k: int = 32):
    # stft_patch: (frames, bins) in dB [-80, 0]
    fidx = np.arange(0, stft_patch.shape[1], freq_stride)
    tidx = np.arange(0, stft_patch.shape[0], time_stride)
    pts = []
    for t in tidx:
        row = stft_patch[t, fidx]
        # pick top-k magnitudes at this time
        k = min(top_k, row.size)
        idx = np.argpartition(row, -k)[-k:]
        sel_freqs = freqs[fidx][idx]
        sel_mags  = row[idx]
        pts.append(np.stack([sel_freqs, sel_mags], axis=1))
    if len(pts) == 0:
        return np.zeros((0, 2), dtype=np.float32)
    P = np.concatenate(pts, axis=0)
    # normalize scales for balance (freq Hz -> kHz; mags in dB -> scaled)
    P = P.astype(np.float32)
    P[:, 0] = P[:, 0] / 1000.0
    P[:, 1] = (P[:, 1] + 80.0) / 80.0  # [-80,0] -> [0,1]
    return P


def persistence_vector(points: np.ndarray, maxdim: int = 1):
    if points.shape[0] == 0:
        return {"dgms": []}
    res = ripser(points, maxdim=maxdim, metric='euclidean')
    return res


def wasserstein_kernel(diagrams_a, diagrams_b, gamma: float = 0.5):
    # Simple RBF on pairwise L2 between flattened finite bars of H0 and H1
    def flatten(dgms):
        flat = []
        for d in dgms:
            if len(d) == 0:
                continue
            d = d[np.isfinite(d).all(axis=1)]
            flat.append(d.reshape(-1))
        if not flat:
            return np.zeros((0,), dtype=np.float32)
        return np.concatenate(flat).astype(np.float32)
    a = flatten(diagrams_a)
    b = flatten(diagrams_b)
    # Pad to same length
    m = max(a.size, b.size)
    a = np.pad(a, (0, m - a.size))
    b = np.pad(b, (0, m - b.size))
    dist2 = np.sum((a - b) ** 2)
    return np.exp(-gamma * dist2)


def build_kernel_matrix(X_list, freqs, maxdim=1, gamma=0.25):
    # Precompute diagrams
    diagrams = []
    for x in tqdm(X_list, desc="Ripser diagrams"):
        # Take middle 200 frames as a robust slice
        mid = x.shape[0] // 2
        start = max(0, mid - 100)
        end = min(x.shape[0], start + 200)
        patch = x[start:end]
        P = stft_patch_to_points(patch, freqs)
        res = persistence_vector(P, maxdim=maxdim)
        dgms = [res.get("dgms", [])[i] if i < len(res.get("dgms", [])) else np.zeros((0, 2)) for i in range(maxdim + 1)]
        diagrams.append(dgms)
    n = len(diagrams)
    K = np.zeros((n, n), dtype=np.float64)
    for i in range(n):
        K[i, i] = 1.0
        for j in range(i + 1, n):
            kij = wasserstein_kernel(diagrams[i], diagrams[j], gamma=gamma)
            K[i, j] = K[j, i] = kij
    return K


In [13]:
# Build precomputed kernel and train/test SVM with a simple holdout
K = build_kernel_matrix(list(X), freqs, maxdim=1, gamma=0.25)
print("Kernel shape:", K.shape, "min/max:", K.min(), K.max())

# Split indices and slice precomputed kernel accordingly
n = K.shape[0]
all_idx = np.arange(n)
tr_idx, te_idx = train_test_split(all_idx, test_size=0.25, stratify=y, random_state=42)

Ktr = K[np.ix_(tr_idx, tr_idx)]
Kte = K[np.ix_(te_idx, tr_idx)]

ytr = y[tr_idx]
yte = y[te_idx]

clf = SVC(kernel='precomputed', C=1.0)
clf.fit(Ktr, ytr)
print("Holdout accuracy:", round(clf.score(Kte, yte), 4))


Ripser diagrams: 100%|██████████| 160/160 [00:05<00:00, 28.37it/s]


Kernel shape: (160, 160) min/max: 1.1290040084247468e-34 1.0
Holdout accuracy: 0.625


# Fitting SVM to homological representation

This notebook follows the Lecture 16 idea: compute a homological representation of data and fit an SVM with a precomputed kernel.

We will:
- Load STFT domain-frequency data from `stft_np/` using the provided `meta.json` and memmaps.
- For each sample, build a 2D point cloud from frequency vs magnitude and compute persistence diagrams (ripser).
- Convert diagrams to barcodes and then to stable rank functions using the provided `stablerank` utilities.
- Build a kernel matrix using Pcf/Pcnif dot-products and fit `sklearn.svm.SVC(kernel='precomputed')`.


In [14]:
import os, json
import numpy as np
import pandas as pd
from tqdm import tqdm

# Data locations
ROOT = "/Users/theodorbjork/SF2956"
STFT_DIR = os.path.join(ROOT, "stft_np")
META_FP = os.path.join(STFT_DIR, "meta.json")

# Load metadata and memmaps
with open(META_FP, "r") as f:
    meta = json.load(f)

A = np.memmap(os.path.join(STFT_DIR, meta["A_memmap"]), dtype=np.float16, mode="r", shape=tuple(meta["A_shape"]))
B = np.memmap(os.path.join(STFT_DIR, meta["B_memmap"]), dtype=np.float16, mode="r", shape=tuple(meta["B_shape"]))
freqs = np.load(os.path.join(STFT_DIR, meta["freqs_npy"]))

A.shape, B.shape, freqs.shape


((940, 1000, 257), (973, 1000, 257), (257,))

In [15]:
# Build labels and a light subset to start (keep runtime manageable)
# Using first N from each class
N_PER_CLASS = 200  # increase to use more data

XA = A[:N_PER_CLASS].astype(np.float32)
XB = B[:N_PER_CLASS].astype(np.float32)
X = np.concatenate([XA, XB], axis=0)   # (n, frames, bins)
y = np.concatenate([np.ones(len(XA), dtype=np.int64), np.zeros(len(XB), dtype=np.int64)])

X.shape, y.shape, np.bincount(y)


((400, 1000, 257), (400,), array([200, 200]))

In [20]:
# Build precomputed kernel on X and train/test SVM
K = build_kernel_matrix(list(X), freqs, maxdim=1, gamma=0.25)
print("Kernel shape:", K.shape)

# Sanity: labels exist and match K
n = K.shape[0]
assert n == X.shape[0]
assert np.array_equal(np.unique(y), np.array([0,1])), np.unique(y)

# Stratified split on indices, then slice K and y consistently
idx = np.arange(n)
tr_idx, te_idx = train_test_split(idx, test_size=0.25, stratify=y, random_state=42)
print("Train/Test label counts:", np.bincount(y[tr_idx]), np.bincount(y[te_idx]))
assert len(np.unique(y[tr_idx])) == 2 and len(np.unique(y[te_idx])) == 2

Ktr = K[np.ix_(tr_idx, tr_idx)]
Kte = K[np.ix_(te_idx, tr_idx)]
ytr, yte = y[tr_idx], y[te_idx]

clf = SVC(kernel='precomputed', C=1.0)
clf.fit(Ktr, ytr)
print("Holdout accuracy:", round(clf.score(Kte, yte), 4))


Ripser diagrams: 100%|██████████| 400/400 [00:16<00:00, 24.62it/s]


Kernel shape: (400, 400)
Train/Test label counts: [150 150] [50 50]
Holdout accuracy: 0.57


In [17]:
# Homological representation utilities
# We'll compute a simple persistence on a downsampled slice per sample.

!pip -q install ripser scikit-learn

from ripser import ripser
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.svm import SVC

# Convert a 2D time-frequency patch into a point cloud in R^2
# We use (frequency, magnitude) pairs sampled over time window(s)

def stft_patch_to_points(stft_patch: np.ndarray, freqs: np.ndarray, time_stride: int = 20, freq_stride: int = 4, top_k: int = 32):
    # stft_patch: (frames, bins) in dB [-80, 0]
    fidx = np.arange(0, stft_patch.shape[1], freq_stride)
    tidx = np.arange(0, stft_patch.shape[0], time_stride)
    pts = []
    for t in tidx:
        row = stft_patch[t, fidx]
        # pick top-k magnitudes at this time
        k = min(top_k, row.size)
        idx = np.argpartition(row, -k)[-k:]
        sel_freqs = freqs[fidx][idx]
        sel_mags  = row[idx]
        pts.append(np.stack([sel_freqs, sel_mags], axis=1))
    if len(pts) == 0:
        return np.zeros((0, 2), dtype=np.float32)
    P = np.concatenate(pts, axis=0)
    # normalize scales for balance (freq Hz -> kHz; mags in dB -> scaled)
    P = P.astype(np.float32)
    P[:, 0] = P[:, 0] / 1000.0
    P[:, 1] = (P[:, 1] + 80.0) / 80.0  # [-80,0] -> [0,1]
    return P


def persistence_vector(points: np.ndarray, maxdim: int = 1):
    if points.shape[0] == 0:
        return {"dgms": []}
    res = ripser(points, maxdim=maxdim, metric='euclidean')
    return res


def wasserstein_kernel(diagrams_a, diagrams_b, p: int = 2, gamma: float = 1.0):
    # Simple RBF on pairwise L2 between flattened finite bars of H0 and H1
    def flatten(dgms):
        flat = []
        for d in dgms:
            if len(d) == 0:
                continue
            d = d[np.isfinite(d).all(axis=1)]
            flat.append(d.reshape(-1))
        if not flat:
            return np.zeros((0,), dtype=np.float32)
        return np.concatenate(flat).astype(np.float32)
    a = flatten(diagrams_a)
    b = flatten(diagrams_b)
    # Pad to same length
    m = max(a.size, b.size)
    a = np.pad(a, (0, m - a.size))
    b = np.pad(b, (0, m - b.size))
    dist2 = np.sum((a - b) ** 2)
    return np.exp(-gamma * dist2)


def build_kernel_matrix(X_list, freqs, maxdim=1, gamma=0.5):
    # Precompute diagrams
    diagrams = []
    for x in tqdm(X_list, desc="Ripser diagrams"):
        # Take middle 200 frames as a robust slice
        mid = x.shape[0] // 2
        start = max(0, mid - 100)
        end = min(x.shape[0], start + 200)
        patch = x[start:end]
        P = stft_patch_to_points(patch, freqs)
        res = persistence_vector(P, maxdim=maxdim)
        dgms = [res.get("dgms", [])[i] if i < len(res.get("dgms", [])) else np.zeros((0, 2)) for i in range(maxdim + 1)]
        diagrams.append(dgms)
    n = len(diagrams)
    K = np.zeros((n, n), dtype=np.float64)
    for i in range(n):
        K[i, i] = 1.0
        for j in range(i + 1, n):
            kij = wasserstein_kernel(diagrams[i], diagrams[j], gamma=gamma)
            K[i, j] = K[j, i] = kij
    return K



In [19]:
# Build precomputed kernel and train/test SVM with CV
from sklearn.model_selection import train_test_split

# Downselect further if needed to speed up
MAX_N = 120
if X.shape[0] > MAX_N:
    X_small = X[:MAX_N]
    y_small = y[:MAX_N]
else:
    X_small, y_small = X, y

K = build_kernel_matrix(list(X_small), freqs, maxdim=1, gamma=0.25)
print("Kernel shape:", K.shape, "min/max:", K.min(), K.max())

# Simple holdout
idx = np.arange(K.shape[0])
Xtr, Xte, ytr, yte, itr, ite = train_test_split(K, y_small, idx, test_size=0.25, stratify=y_small, random_state=42)
# For precomputed kernel, pass the submatrices appropriately
Ktr = K[np.ix_(itr, itr)]
Kte = K[np.ix_(ite, itr)]

clf = SVC(kernel='precomputed', C=1.0)
clf.fit(Ktr, ytr)
acc = clf.score(Kte, yte)
print("Holdout accuracy:", round(acc, 4))

# Cross-validation on full K
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = []
for tr, te in cv.split(K, y_small):
    Ktr = K[np.ix_(tr, tr)]
    Kte = K[np.ix_(te, tr)]
    clf = SVC(kernel='precomputed', C=1.0)
    clf.fit(Ktr, y_small[tr])
    scores.append(clf.score(Kte, y_small[te]))
print("CV scores:", [round(s, 4) for s in scores], "| mean=", round(float(np.mean(scores)), 4))


Ripser diagrams: 100%|██████████| 120/120 [00:03<00:00, 37.35it/s]


Kernel shape: (120, 120) min/max: 5.285725150127048e-20 1.0


ValueError: The number of classes has to be greater than one; got 1 class