In [88]:
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn import preprocessing
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from IPython.display import clear_output

Load the dataset

In [89]:
X = pd.read_csv("../Data/data.csv").drop(['Unnamed: 0'], axis = 1).to_numpy()
y = pd.read_csv("../Data/labels.csv").drop(['Unnamed: 0'], axis = 1).to_numpy()

In [90]:
X_proto = X[1:101, 1:201]
y_proto = y[1:101]

In [91]:
class PCAObject:
    def __init__(self, PCs, X, label):
        assert len(PCs) == X.shape[0], "Need all the principal components!"
        self.pcs = PCs
        self.X = StandardScaler().fit_transform(X)
        self.cov = np.cov(X.T)
        self.label = label
    
    def calcExpVar(self):
        self.ev = np.diag(PCs @ np.cov(X.T) @ PCs.T)
        self.pev = [ev/sum(self.ev) for ev in self.ev]
        
    def calcNonZeroLoads(self):
        self.nonZeroLoads = [np.count_nonzero(pc) for pc in self.pcs]
        
    def plotNonZeroLoadtoPEV(self):
        if self.pev == None: self.calcExpVar()
        if self.nonZeroLoads == None: self.calcNonZeroLoads()
            
        plt.plot(self.nonZeroLoads, self.pev, label=self.label)
        plt.xlabel('Number of non-zero loadings')
        plt.xscale('log')
        plt.ylabel('Percentage of explained variance (PEV)')
        plt.title('Percentage of explained variance(PEV) vs non-zero loadings')

In [92]:
def reg_PCA(X, k = "all"): 
    """
    function takes an n x p feature matrix
    returns two arrays:
    - array with percentage of explained variance in first k principal directions (k_comp x 1)
    - array with principal directions (k_comp x p)
    """
    X = StandardScaler().fit_transform(X)
    if k == "all": k = min(X.shape[0],X.shape[1])
    pca = PCA(n_components = k)
    pca.fit(X)
    PEVs = pca.explained_variance_ratio_
    prin_comp = pca.components_
    EVs = pca.explained_variance_
    
    return PEVs, prin_comp, EVs

In [93]:
def threshold_PCA(X, thresh = 1e-1, k = "all"):
    """
    function takes
    - X: n x p feature matrix
    - thresh: float representing this non-zero cutoff
    - k: integer for number of principal directions wanted
    returns one array:
    - array with principal components in its columns (k x p)
    """
    if k == "all": k = min(X.shape[0],X.shape[1])

    pcs = reg_PCA(X)[1]
    pcs = (np.abs(pcs) >= thresh).astype(int) * pcs
    
    return pcs[:k]

In [94]:
def nonZeroLoad_PCA(X,j, k = "max"):
    """
    function takes
    - X: n x p feature matrix
    - j: integer for number of non-zero loadings,
    - k: integer for number of principal directions wanted
    returns three arrays:
    - array with percentages of explained variance in first k principal directions (k x 1)
    - array with principal directions (k x p)
    - array with explained variances
    """   
    X_scaled = StandardScaler().fit_transform(X)
    if k == "all": k = min(X.shape[0],X.shape[1])
    
    PCA_PEV, PCA_PC, PCA_EV = reg_PCA(X,min(X.shape[0],X.shape[1]))
    
    total_var = sum(PCA_EV)
    
    thresh_PCA_PC = np.empty((0,PCA_PC.shape[1]))
    thresh_PCA_PEV = []
    thresh_PCA_EV = []
    
    PCA_PC_sorted = np.sort(np.absolute(PCA_PC), axis = 1)
    for m in range(k):
        thresh = PCA_PC_sorted[m][-j]
        thresh_PC = (np.absolute(PCA_PC[m]) >= thresh).astype(int)*PCA_PC[m]
        thresh_PCA_PC = np.vstack((thresh_PCA_PC, thresh_PC))
    
    return thresh_PCA_PC

In [116]:
PCs = reg_PCA(X_proto)[1]
reg = PCAObject(PCs, X_proto, "Regular")

In [130]:
r, s = PCs[0], PCs[1]
k = 1
X_scaled = StandardScaler().fit_transform(X_proto)
r_new, s_new, v_r, v_s, v_rs, v_old_r, v_old_s, v_old_rs, beta_star = SimpTrans(r, s, k, X_scaled)
r[0:4]

Optimizing target new variance...


array([-0.02365623, -0.01519819, -0.04110087, -0.10380788])

In [131]:
r_new[0:4]

array([-0.04731247, -0.03039637, -0.08220175, -0.20761577])

In [132]:
beta_star

0.0

In [134]:
v_old_r

26.20772519288774

In [133]:
v_r

26.20772519288774

In [129]:
def SimpTrans(r, s, k, X_scaled):
    """
    function takes
    - Two directions r and s to transform
    - An integer k that restricts the possible directions
    - The scaled data matrix X
    returns two vectors and three floats
    - Two new directions r and s
    - Two floats representing the variances explained by the new directions r and s
    - One float representing the covariance of the new directions r and s
    """
    # Calculate the covariance matrix for the two directions
    cov = np.vstack((r,s)) @ np.cov(X_scaled.T) @ np.vstack((r,s)).T
    v_old_r, v_old_s, v_old_rs = cov[0,0], cov[1,1], cov[0,1]
    
    # Get a list of all possible betas
    poss_beta1 = [i/2**k for i in range(-2**k, 2**k+1, 1)]
    poss_beta2 = [2**k/i for i in range(-2**k, 2**k+1, 1) if i != 0]
    poss_beta = np.sort(list(set(poss_beta1 + poss_beta2)))
    
    # Calculate the norms of the two directions
    l2_r, l2_s = np.linalg.norm(r)**2, np.linalg.norm(s)**2
    
    # Define closure function for maximization problem
    def newDirVar(beta):
        """
        function takes
        - Covariance matrix of directions
        - Norms of both directions
        - Parameter beta
        returns one float
        - Float representing the variance in the new direction
        """
        v_r, v_s, v_rs = cov[0,0], cov[1,1], cov[0,1]
        nom = l2_r*v_r + 2*beta*np.sqrt(l2_r*l2_s)*v_rs + beta**2*l2_s*v_s
        denom = l2_r + beta**2*l2_s
        return -nom/denom
    
    # Optimize the variance of the first new direction r_new
    print("Optimizing target new variance...")
    beta_star = minimize(newDirVar, 0, method = 'BFGS')
    
    # Select beta from possible values that's closest to the optimal value
    beta_star = min(list(poss_beta), 
                    key = lambda x:abs(x-beta_star.x))
    
    # Calculate the two new directions
    if abs(beta_star) <= 1:
        r_new = 2**k*r + 2**k*beta_star*s
        s_new = 2**k*beta_star*l2_s*r - 2**k*l2_r*s
    else:
        r_new = 2**k*r/beta_star + 2**k*s
        s_new = 2**k*l2_s*r - 2**k*l2_r*s/beta_star
    
    # Calculate variances and covariance of the new directions
    P = np.array([[1, l2_s*beta_star],[beta_star, -l2_r]])
    cov_new = P.T @ cov @ P
    v_r, v_s, v_rs = cov_new[0,0], cov_new[1,1], cov_new[0,1]
    
    return r_new, s_new, v_r, v_s, v_rs, v_old_r, v_old_s, v_old_rs, beta_star

In [104]:
def pcPairFinder(cov, cache):
    """
    function takes
    - A covariance matrix
    - A cache containing the indices of excluded pcs
    returns two integers:
    - i_r: the index of the first principal component
    - i_s: the index of the second principal component
    """
    # Turn covariance matrix into a sparse lower triangular matrix
    grid = np.tril(cov, k = -1)
    # Flatten the grid and sort from largest cov to smallest cov
    srt = np.sort(np.ravel(grid), kind = 'heapsort')
    lst = list(np.flip(np.trim_zeros(srt)))
    mask = False
    
    # Find the pair of pcs that are not already in the cache and hav
    # the highest covariance in the grid
    while not np.any(mask):
        covar = lst.pop(0)
        index = np.argwhere(grid == covar)
        mask = np.ravel(np.invert(np.any(np.isin(index, cache), axis = 1, keepdims = True)))

    # Unpack the indices
    i_r, i_s = index[mask,:][0]
    return i_r, i_s

In [113]:
def simple_PCA(X, k, iters = 2):
    """
    function takes
    - An unscaled data matrix X
    - An integer k that restricts the possible directions
    - An integer indicating the number of iterations
    returns one array:
    - array with principal components in its columns (k x p)
    """
    # Retrieve the principal components and their covariance matrix
    pcs = reg_PCA(X)[1]
    X_scaled = StandardScaler().fit_transform(X)
    cov = np.cov(X.T)
    
    # Setup up globals for loop
    cov_pc = pcs @ cov @ pcs.T
    cache = []
    
    for _ in range(iters):
        while len(cache) <= len(pcs) - 2:
            clear_output(wait = True)
            print("Working on principal component ", len(cache) + 2,"/",len(pcs))
            
            # Find set of principal components to transform
            i_r, i_s = pcPairFinder(cov_pc, cache)
            r, s = pcs[i_r], pcs[i_s]
            cache += [i_r, i_s]
            
            # Transform the pair of components
            print("Determining new directions...")
            r_new, s_new, v_r, v_s, v_rs = SimpTrans(r, s, k, X_scaled)

            # Update the principal components
            pcs[i_r] = r_new
            pcs[i_s] = s_new
            
            # Update the grid matrix
            cov_pc[i_s, i_r], cov_pc[i_r, i_s] = v_rs, v_rs
    
    return pcs

In [114]:
simple_PCA(X_proto, 1)

Working on principal component  100 / 100
Determining new directions...
Optimizing target new variance...


array([[ 0.04731247,  0.03039637,  0.08220175, ...,  0.06533126,
        -0.03597368, -0.00844237],
       [ 0.04375186, -0.13096553,  0.04633334, ...,  0.10686803,
        -0.16336375,  0.09067011],
       [ 0.22685844,  0.17815668,  0.21355734, ...,  0.13263308,
        -0.09529372, -0.2212941 ],
       ...,
       [-0.07052483,  0.16484458,  0.17802846, ...,  0.10711341,
         0.0441666 , -0.05536424],
       [-0.09819298,  0.14240624, -0.00373368, ...,  0.33874281,
        -0.14267526, -0.18931907],
       [-0.00327837,  0.07643644,  0.09458163, ...,  0.00288164,
         0.01779362, -0.02474867]])