In [None]:
import numpy as np
import pandas as pd
from scipy.stats import multivariate_normal

def buggy_pca(X, d):
    """
    Buggy PCA applied directly on the matrix X.
    
    Args:
    X: n x d numpy array - data matrix
    d: int - number of dimensions for low dimensional representation
    
    Returns:
    Z: n x d numpy array - low dimensional representation of data
    V: d x d numpy array - estimated projection matrix
    recons: n x D numpy array - reconstructed data in D dimensions
    """
    # compute SVD of X
    U, s, Vt = np.linalg.svd(X, full_matrices=False)
    
    # keep top d singular values and vectors
    V = Vt[:d].T
    
    # project data onto low dimensional space
    Z = X @ V
    
    # compute reconstruction in D dimensions
    recons = Z @ V.T
    
    return Z, V, recons

def demeaned_pca(X, d):
    """
    PCA with mean subtraction applied on the matrix X.
    
    Args:
    X: n x d numpy array - data matrix
    d: int - number of dimensions for low dimensional representation
    
    Returns:
    Z: n x d numpy array - low dimensional representation of data
    V: d x d numpy array - estimated projection matrix
    recons: n x D numpy array - reconstructed data in D dimensions
    """
    # subtract mean along each dimension
    X_demeaned = X - np.mean(X, axis=0)
    
    # compute SVD of X_demeaned
    U, s, Vt = np.linalg.svd(X_demeaned, full_matrices=False)
    
    # keep top d singular values and vectors
    V = Vt[:d].T
    
    # project data onto low dimensional space
    Z = X_demeaned @ V
    
    
    # compute reconstruction in D dimensions
    recons = (Z @ V.T) + np.mean(X, axis=0)
    
    return Z, V, recons

def normalized_pca(X, d):
    """
    PCA with mean subtraction and normalization applied on the matrix X.
    
    Args:
    X: n x d numpy array - data matrix
    d: int - number of dimensions for low dimensional representation
    
    Returns:
    Z: n x d numpy array - low dimensional representation of data
    V: d x d numpy array - estimated projection matrix
    recons: n x D numpy array - reconstructed data in D dimensions
    """
    # subtract mean along each dimension
    X_demeaned = X - np.mean(X, axis=0)
    
    # scale each dimension to have unit variance
    X_normalized = X_demeaned / np.std(X_demeaned, axis=0)
    
    # compute SVD of X_normalized
    U, s, Vt = np.linalg.svd(X_normalized, full_matrices=False)
    
    # keep top d singular values and vectors
    V = Vt[:d].T
    
    # project data onto low dimensional space
    Z = X_normalized @ V
    
    # compute reconstruction in D dimensions
    recons = (Z @ V.T * np.std(X_demeaned, axis=0)) + np.mean(X, axis=0)
    
    return Z, V, recons

def DRO(X, d):
    # Dimensionality reduction via optimization
    X_mean = np.mean(X, axis=0)
    X_centered = X - X_mean
    
    U, S, Vt = np.linalg.svd(X_centered)
    
   
    A = Vt[:d,:].T
    b = X_mean
    Z = U[:, :d] @ np.diag(S[:d])
    
    X_recon = Z @ A.T + b
    return Z, A, b, X_recon

def DRLV(X, d, n_iter=10):
    # Dimensionality reduction via a generative model
    
    N, D = X.shape
    
    # Initialize A, b using DRO
    Z_init, A_init, b_init, X_recon_init = DRO(X, d)
    
    # Set initial Î· based on the reconstruction errors of DRO
    eta_init = np.sum((X - X_recon_init)**2) / N
    
    # Initialize the parameters
    A = A_init
    b = b_init
    eta = eta_init
    
    # EM Algorithm
    for _ in range(n_iter):
        # E-step
        M = A.T @ A + eta**2 * np.eye(d)
        Minv = np.linalg.inv(M)
        
        Z_mean = [(Minv @ A.T @ (x - b)) for x in X]
        Z_cov = [eta**2 * Minv for _ in X]
        
        # M-step
        A_num = sum((x - b).reshape(-1, 1) @ z.reshape(1, -1) for x, z in zip(X, Z_mean))
        A_denom_inv = sum(z.reshape(-1, 1) @ z.reshape(1, -1) + cov for z, cov in zip(Z_mean, Z_cov))
        
        A = A_num @ np.linalg.inv(A_denom_inv)
        eta_squared = sum(np.linalg.norm(x - A @ z - b)**2 + np.trace(A.T @ A @ cov) for x, z, cov in zip(X, Z_mean, Z_cov))
        eta = np.sqrt(eta_squared / (N * D))
    
    X_recon = np.array([A @ z + b for z in Z_mean])

    return A, b, eta, X_recon


# Load the 2D dataset
data_2d = pd.read_csv('data/data2D.csv', header=None)
X_2d = data_2d.values
n_2d, D_2d = X_2d.shape

# Perform DRO and DRLV on 2D dataset
d_2d = 1
Z_buggy, V_buggy, X_recon_buggy_2d = buggy_pca(X_2d, d_2d)
Z_demean, V_demean, X_recon_demean_2d = demeaned_pca(X_2d, d_2d)
Z_normal, V_normal, X_recon_normal_2d = normalized_pca(X_2d, d_2d)
Z_dro_2d, A_dro_2d, b_dro_2d, X_recon_dro_2d = DRO(X_2d, d_2d)
Z_drlv_2d, A_drlv_2d, b_drlv_2d, X_recon_drlv_2d = DRLV(X_2d, d_2d)

# Calculate reconstruction errors for 2D dataset
error_buggy_2d = np.sum((X_2d - X_recon_buggy_2d)**2)/data_2d.shape[0]
error_demean_2d = np.sum((X_2d - X_recon_demean_2d)**2)/data_2d.shape[0]
error_normal_2d = np.sum((X_2d - X_recon_normal_2d)**2)/data_2d.shape[0]
error_dro_2d = np.sum((X_2d - X_recon_dro_2d)**2)/data_2d.shape[0]
error_drlv_2d = np.sum((X_2d - X_recon_drlv_2d)**2)/data_2d.shape[0]

print('Buggy PCA: ', error_buggy_2d)
print('Demeaned PCA: ', error_demean_2d)
print('Normalized PCA: ', error_normal_2d)
print('DRO: ', error_dro_2d)
print('DRLV: ', error_drlv_2d)

# Plot original and reconstructed points for 2D dataset
import matplotlib.pyplot as plt

#plt.title('DRLV')
plt.scatter(X_2d[:,0], X_2d[:,1], label='Original')

#plt.scatter(X_recon_buggy_2d[:,0], X_recon_buggy_2d[:,1], label='Buggy')
#plt.scatter(X_recon_demean_2d[:,0], X_recon_demean_2d[:,1], label='Demeaned')
#plt.scatter(X_recon_normal_2d[:,0], X_recon_normal_2d[:,1], label='Normalized')
#plt.scatter(X_recon_dro_2d[:,0], X_recon_dro_2d[:,1], label='DRO')
#plt.scatter(X_recon_drlv_2d[:,0], X_recon_drlv_2d[:,1], label='DRLV')
plt.legend()
plt.show()



In [None]:
# load the dataset for 1000d
data_1000d = pd.read_csv('data/data1000D.csv', header=None)
X_1000d = data_1000d.values

X_mean = np.mean(X_1000d, axis=0)
X_centered = X_1000d - X_mean
    
U, S, Vt = np.linalg.svd(X_centered,full_matrices=True)

In [None]:
plt.title('d vs. singular value')
plt.plot(S)
#plt.ylim(0,100)

In [None]:
X_1000d = data_1000d.values
n_1000d, D_1000d = X_1000d.shape

d = []
error_buggy = []
error_demean = []
error_normal = []
error_dro = []
error_drlv = []

# Perform DRO and DRLV on 1000D dataset
for d_1000d in range(25,36):
    Z_buggy, V_buggy, X_recon_buggy_1000d = buggy_pca(X_1000d, d_1000d)
    Z_demean, V_demean, X_recon_demean_1000d = demeaned_pca(X_1000d, d_1000d)
    Z_normal, V_normal, X_recon_normal_1000d = normalized_pca(X_1000d, d_1000d)
    Z_dro_1000d, A_dro_1000d, b_dro_1000d, X_recon_dro_1000d = DRO(X_1000d, d_1000d)
    Z_drlv_1000d, A_drlv_1000d, b_drlv_1000d, X_recon_drlv_1000d = DRLV(X_1000d, d_1000d)

    # Calculate reconstruction errors for 1000D dataset
    error_buggy_1000d = np.sum((X_1000d - X_recon_buggy_1000d)**2)/data_1000d.shape[0]
    error_demean_1000d = np.sum((X_1000d - X_recon_demean_1000d)**2)/data_1000d.shape[0]
    error_normal_1000d = np.sum((X_1000d - X_recon_normal_1000d)**2)/data_1000d.shape[0]
    error_dro_1000d = np.sum((X_1000d - X_recon_dro_1000d)**2)/data_1000d.shape[0]
    error_drlv_1000d = np.sum((X_1000d - X_recon_drlv_1000d)**2)/data_1000d.shape[0]
    
    d.append(d_1000d)
    error_buggy.append(error_buggy_1000d)
    error_demean.append(error_demean_1000d)
    error_normal.append(error_normal_1000d)
    error_dro.append(error_dro_1000d)
    error_drlv.append(error_drlv_1000d)
    


In [None]:
plt.figure(figsize=(8,4))

plt.plot(d, error_buggy, label='Buggy')
plt.plot(d, error_demean, label='Demeaned')
plt.plot(d, error_normal, label='Normalized')
plt.plot(d, error_dro, label='DRO')
plt.plot(d, error_drlv, label='DRLV')

plt.ylim(0,3000)
plt.legend()
plt.show()