In [2]:
def filter_by_pca_curvature(X, k=20, keep_ratio=0.3):
    tree = cKDTree(X) # Construct cKDTree from point cloud
    n = X.shape[0] # n is the number of points
    curvature_scores = np.zeros(n) # Initialize matrix of zeroes
    
    for i in range(n):
        _, idxs = tree.query(X[i], k=k+1)  # Include the point itself
        neighbors = X[idxs[1:]]  # Exclude the point itself, construct neighbors matrix
        cov = np.cov(neighbors - neighbors.mean(axis=0), rowvar=False) # Construct covariance matrix of centered neighbors
        eigvals = np.linalg.eigvalsh(cov) # Compute eigenvalues of the symmetric matrix
        eigvals = np.maximum(eigvals, 1e-10)  # Numerical stability
        #curvature_scores[i] = eigvals[0] / np.sum(eigvals)  # λ₃ / sum(λ)
        #curvature_scores[i] = eigvals[0]
        #curvature_scores[i] = (eigvals[1] - eigvals[0]) / eigvals[2]
        #curvature_scores[i] = eigvals[0] / np.sum(eigvals)  # as is
        curvature_scores[i] = (eigvals[2] - eigvals[0]) / eigvals[2] # Largest - smallest / largest

    # Keep top N% with highest curvature
    threshold_idx = int(n * keep_ratio) # Remove flat or nearly spherical ellipsoids of inertia
    keep_indices = np.argsort(curvature_scores)[-threshold_idx:]
    return X[keep_indices]

    # Sinkhorn
def sinkhorn(K, num_iters=100, tol=1e-6):
    u = np.ones(K.shape[0]) # Initalize flat array of n 1's
    v = np.ones(K.shape[1]) # Initalize flat array of n 1's
    for i in range(num_iters): # Loop through iterative convergence, as shown in the latex pdf
        u_prev = u.copy()
        Kv = K @ v
        u = 1.0 / np.maximum(Kv, 1e-8)
        KTu = K.T @ u
        v = 1.0 / np.maximum(KTu, 1e-8)
        delta = np.linalg.norm(u - u_prev) # compute delta, to track changes in u and convergence speed
        #print(f"Iter {i}, delta={delta:.2e}")
        if np.linalg.norm(u - u_prev) < tol: # Early stopping at convergence
            print(f"Sinkhorn converged at iteration {_}")
            break
            
    return np.diag(u) @ K @ np.diag(v)  # A: soft permutation matrix, project the 1D arrays of u and v on a diagonal


def random_orthogonal_matrix(d=3):
    H = np.random.randn(d, d) # Generate random orthogonal matrix
    Q, _ = np.linalg.qr(H) # Find orthonormal matrix using gram-schmidt (from QR decomposition)
    return Q

def reflection_group(d=3):
    return [np.diag(signs) for signs in product([-1, 1], repeat=d)] # Reflect the ellipsoid in 2^3 possible orientations

In [4]:
import time
import numpy as np
from itertools import product
from scipy.optimize import linear_sum_assignment
import open3d as o3d
from scipy.spatial import cKDTree

# Load point cloud once
pcd = o3d.io.read_point_cloud("/Users/judahlevin/UATX/Linear Algebra/Final Project/Models/horse.ply")
X_full = np.asarray(pcd.points)
n_full = X_full.shape[0]

rotation_errors = []

# Start timing
start_time = time.time()

for seed in range(100):
    np.random.seed(seed)

    # STEP 1: Load and Subsample Data
    n = 1000
    indices = np.random.choice(n_full, n, replace=False)
    X_sampled = X_full[indices]
    X = X_sampled

    # STEP 2: Ground Truth on Raw Cloud
    P_full = X - X.mean(axis=0)
    O = random_orthogonal_matrix()
    perm = np.random.permutation(n)
    Π_true = np.eye(n)[perm]
    P_perm = Π_true @ P_full
    Q_true = (O @ P_perm.T).T
    Q = Q_true  # Already centered
    Q_filtered = filter_by_pca_curvature(Q_true, k=20, keep_ratio=0.3)
    Q = Q_filtered - Q_filtered.mean(axis=0)

    # STEP 3: Filtered Point Cloud
    X_filtered = filter_by_pca_curvature(X, k=20, keep_ratio=0.3)
    P = X_filtered - X_filtered.mean(axis=0)
    n = P.shape[0]

    # STEP 4: Kolpakov E–Init + Sinkhorn + Hungarian
    E_P = P.T @ P
    E_Q = Q.T @ Q
    _, U_P = np.linalg.eigh(E_P)
    _, U_Q = np.linalg.eigh(E_Q)
    U0 = U_Q @ U_P.T

    U_best = None
    best_error = float("inf")
    col_ind_best = None

    for D in reflection_group():
        U_candidate = U0 @ U_P @ D @ U_P.T
        Q_candidate = Q_filtered @ U_candidate.T

        Q_sq = np.sum(Q_candidate**2, axis=1, keepdims=True)
        P_sq = np.sum(P**2, axis=1, keepdims=True)
        cost_matrix = Q_sq + P_sq.T - 2 * Q_candidate @ P.T
        cost_std = np.std(cost_matrix)
        epsilon = 0.05 * cost_std

        K = np.exp(-(cost_matrix - np.min(cost_matrix)) / epsilon)
        A = sinkhorn(K)

        row_ind, col_ind = linear_sum_assignment(-A)
        Π_est = np.zeros_like(A)
        Π_est[row_ind, col_ind] = 1
        P_matched = P[col_ind]
        error = np.linalg.norm(P_matched - Q_candidate)

        if error < best_error:
            best_error = error
            U_best = U_candidate
            col_ind_best = col_ind

    # STEP 5: Evaluation on Raw Cloud
    δ_o = np.linalg.norm(U_best - O)
    rotation_errors.append(δ_o)

# End timing
end_time = time.time()
total_time = end_time - start_time
avg_time_per_seed = total_time / 100

# -------------------------------
# Summary Statistics
# -------------------------------
rotation_errors = np.array(rotation_errors)
print("\n=== Kolpakov E–Init + Sinkhorn + Hungarian Summary over 100 Seeds ===")
print(f"Mean δ_o   (rotation error):       {rotation_errors.mean():.6f}")
print(f"Median δ_o (rotation error):       {np.median(rotation_errors):.6f}")
print(f"Std δ_o    (rotation error):       {rotation_errors.std():.6f}")
print(f"\nAverage time per seed loop:        {avg_time_per_seed:.6f} seconds")


=== Kolpakov E–Init + Sinkhorn + Hungarian Summary over 100 Seeds ===
Mean δ_o   (rotation error):       1.746586
Median δ_o (rotation error):       2.000000
Std δ_o    (rotation error):       1.212203

Average time per seed loop:        0.631069 seconds
