In [5]:
%load_ext autoreload
%autoreload 2

In [37]:
import numpy as np 
from pycss.utils import *
from pycss.subset_selection import *
from scipy import stats
from notebook_utils.utils import *

Some helper functions for testing

In [2]:
def replace_submatrix(mat, ind1, ind2, mat_replace):
    for i, index in enumerate(ind1):
        mat[index, ind2] = mat_replace[i, :]
    return mat

def compute_Sigma_MLE(Sigma, S, noise='sph'):
    p = Sigma.shape[0]
    k = len(S)
    S = np.sort(S)
    S_comp = complement(p, S)
    Sigma_R = regress_off(Sigma, S) 
    D_ = np.diag(Sigma_R)[S_comp]
    if noise == 'sph':
        D = np.ones(p - k) * np.mean(D_)
    if noise == 'diag':
        D = D_.copy()
    Sigma_MLE = np.zeros((p, p))
    Sigma_MLE = replace_submatrix(Sigma_MLE, S, S, Sigma[S, :][:, S])
    Sigma_MLE = replace_submatrix(Sigma_MLE, S, S_comp,  Sigma[S, :][:, S_comp])
    Sigma_MLE = replace_submatrix(Sigma_MLE, S_comp, S, Sigma[S_comp, :][:, S])
    Sigma_MLE = replace_submatrix(Sigma_MLE, S_comp, S_comp, Sigma[S_comp, :][:, S] @  np.linalg.inv(Sigma[S, :][:, S]) @ Sigma[S, :][:, S_comp] + np.diag(D))
    return Sigma_MLE

Generate factor model and data

In [41]:
p = 50
n= 2000
k= 15
B = 10

def gen_Sigma(n, p, k):
    W = np.random.multivariate_normal(np.zeros(k), cov=np.eye(k), size=p)
    D = np.sqrt(k) * np.square(np.random.normal(0, 1, p))
    Sigma = W @ W.T
    np.fill_diagonal(Sigma, np.diag(Sigma) + D)
    Sigma = standardize_cov(Sigma)
    X = np.random.multivariate_normal(np.zeros(p), cov= Sigma, size=n)
    mu_hat, Sigma_hat = get_moments(X)
    return Sigma_hat

Test Greedy CSS

In [79]:
Sigmas = []
for i in range(B):
    Sigma = gen_Sigma(n, p, k)
    Sigmas.append(Sigma)
    N1 = np.random.choice([0, 3, 6, 9], 1, p=[0.7, 0.1, 0.1, 0.1])
    N2 = np.random.choice([0, 3, 6, 9], 1, p=[0.7, 0.1, 0.1, 0.1])
    include = np.random.choice(np.arange(p), N1, replace=False).astype(int)
    exclude = np.random.choice([idx for idx in np.arange(p) if idx not in include], N2, replace=False).astype(int)
    
    S, _ = greedy_css(Sigma, k=p-len(exclude), include=include, exclude=exclude)
    S_cutoff, _ = greedy_css(Sigma, cutoffs=0, include=include, exclude=exclude)
    
    if not set(include).issubset(S):
            print('Iteration ' + str(i) + ' does not include all of include.')
            
    if len(set(exclude).intersection(S)) > 0:
            print('Iteration ' + str(i) + ' does not exclude all of exclude.')
    
    if not np.all(S == S_cutoff):
        print('k and cutoff dont agree for iteration ' + str(i))
    
    S_naive = (-1 * np.ones(p - len(exclude))).astype(int)
    S_naive[:len(include)] = include 
    for j in range(len(include), len(S_naive)):
        options = [idx for idx in np.arange(p) if (idx not in naive_S and idx not in exclude)]
        best_idx = None
        best_obj_val = np.inf
        for idx in options:
            potential_S = np.concatenate([S_naive[ :j], np.array([idx]) ])
            obj_val = np.trace(regress_off(Sigma, potential_S))
            if obj_val < best_obj_val:
                best_obj_val = obj_val
                best_idx = idx
        S_naive[j] = best_idx
    
    if not np.all(S == S_naive):
        print('On iteration ' + str(i) + ' we dont match the naive solution st ' + str(np.where(S_naive != S)[0]))
              

Test Swapping CSS 

In [78]:
Sigmas = []
for i in range(B):
    Sigma = gen_Sigma(n, p, k)
    Sigmas.append(Sigma)
    for s in range(1, 15):
        N1 = np.random.choice(np.arange(s))
        N2 = np.random.choice(np.arange(s))
        include = np.random.choice(np.arange(p), N1, replace=False).astype(int)
        exclude = np.random.choice([idx for idx in np.arange(p) if idx not in include], N2, replace=False).astype(int)
        S, _, _, converged = swapping_css(Sigma, s, tol=TOL, include=include, exclude=exclude)
        
        if not set(include).issubset(S):
            print('Size ' + str(s) + ' subset for iteration ' + str(i) + ' does not include all of include.')
            
        if len(set(exclude).intersection(S)) > 0:
            print('Size ' + str(s) + ' subset for iteration ' + str(i) + ' does not exclude all of exclude.')
        
        if not converged:
            print('Size ' + str(s) + ' subset for iteration ' + str(i) + ' did not converge.')
            continue 
    
        for j in range(len(S)):
            chosen = S[j]
            if chosen in include:
                continue 
            temp_S  = np.delete(S, j)
            options = np.array([idx for idx in np.arange(p) if (idx not in exclude and idx not in temp_S)])
            best_obj_val = np.inf
            best_idx = None
            
            for ell in options:
                potential_S= np.concatenate([temp_S, np.array([ell])]).astype(int)
                obj_val = np.trace(regress_off(Sigma, potential_S))
                if obj_val < best_obj_val:
                    best_obj_val = obj_val
                    best_idx = ell
            
            if best_idx != chosen:
                print('Mistake chosing index ' + str(best_idx) + 'zfor the size ' + str(s) + ' subset ' + ' on iteration ' + str(i))

Test Greedy PCSS 

In [6]:
noise = 'sph'

for k in range(1, p-1):
    if noise == 'sph':
        S, Sigma_R, errors = greedy_subset_selection(Sigma_hat, k, sph_pcss_objective, flag_colinearity=True, tol=TOL)
    if noise == 'diag':
        S, Sigma_R, errors = greedy_subset_selection(Sigma_hat, k, diag_pcss_objective, flag_colinearity=True, tol=TOL)
    if len(errors) > 0:
        print('Colinearity errors at ' + str(k) + ': ', errors)
    S_removed = S[:(k - 1)]
    S_removed_comp = complement(p, S_removed)
    temp_objectives = []
    for t in S_removed_comp:
        S_added = np.concatenate([S_removed, np.array([t])]).astype(int)
        Sigma_MLE_temp = compute_Sigma_MLE(Sigma_hat, S=S_added, noise=noise)
        temp_objectives.append(-1 * np.mean(stats.multivariate_normal(mean=mu_hat, cov=Sigma_MLE_temp).logpdf(X)))
    if S_removed_comp[np.argmin(temp_objectives)] != S[k-1]:
        print('Mistake at ', k)

Test Swapping PCSS

In [7]:
noise = 'sph'

for k in range(1, 15):
    if noise == 'sph':
        S, Sigma_R, S_init, converged, errors = swapping_subset_selection(Sigma_hat, k, sph_pcss_objective, flag_colinearity=True, tol=TOL)
    if noise == 'diag':
        S, Sigma_R, S_init, converged, errors = swapping_subset_selection(Sigma_hat, k, diag_pcss_objective, flag_colinearity=True, tol=TOL)
  
    if len(errors) > 0:
        print('Colinearity errors at ' + str(k) + ': ', errors)

    if not converged:
        print(str(k) + ' did not converge')
        continue 

    for i in range(len(S)):
        chosen = S[i]
        S_temp = np.delete(S, i)
        S_temp_comp = complement(p, S_temp)
        temp_objectives = []
        for t in S_temp_comp:
            S_added= np.concatenate([S_temp, np.array([t])]).astype(int)
            Sigma_MLE_temp = compute_Sigma_MLE(Sigma_hat, S=S_added, noise=noise)
            temp_objectives.append(-1 * np.mean(stats.multivariate_normal(mean=mu_hat, cov=Sigma_MLE_temp).logpdf(X)))
        if S_temp_comp[np.argmin(temp_objectives)] != chosen:
            print('Mistake at ', k)