In [55]:
from tsplearn import *
import numpy as np 
import pandas as pd

# Load the graph
G = EnhancedGraph(n=40, p=0.162, seed=0)
B1 = G.get_b1()
B2 = G.get_b2()

# Sub-sampling if needed to decrease complexity
sub_size = 100
B1 = B1[:, :sub_size]
B2 = B2[:sub_size, :]
B2 = B2[:,np.sum(np.abs(B2), 0) == 3]

# Laplacians
Ld = np.matmul(np.transpose(B1), B1, dtype=float)
Lu = np.matmul(B2, np.transpose(B2), dtype=float)
L = Lu+Ld
n =  L.shape[0]
nu = B2.shape[1]
nd = B1.shape[1]

# Problem and Dictionary Dimensionalities
m_train = 150 # Number of Train Signals
m_test = 80 # Number of Test Signal
s = 3 # Number of Kernels (Sub-dictionaries)
k = 2 # Polynomial order
sparsity = .1 # Sparsity percentage
K0_max = 20 #floor(n*sparsity) # Sparsity
sparsity_mode = "max"

# Data-Independent Problem Hyperparameters
dictionary_type = ""
K0_coll = np.arange(5, 26, 4) 
max_iter = 30 
patience = 5 
tol = 1e-7 # tolerance for Patience
n_sim = 10
lambda_ = 1e-7 # l2 multiplier
verbose = True

In [2]:
dictionary_type = "separated"

In [2]:
from tqdm import tqdm

dictionary_type = "separated"

D_true = np.zeros((n, n * s, n_sim))
D_true_coll = np.zeros((n, n, s, n_sim))
Y_train = np.zeros((n, m_train, n_sim))
Y_test = np.zeros((n, m_test, n_sim))
epsilon_true = np.zeros(n_sim)
c_true = np.zeros(n_sim)
X_train = np.zeros((n * s, m_train, n_sim))
X_test = np.zeros((n * s, m_test, n_sim))
n_search = 3000

for sim in range(n_sim):
    best_sparsity = 0
    best_acc = 0

    for i in tqdm(range(n_search)):
        try:
            D_try, h, Y_train_try, Y_test_try, epsilon_try, c_try, X_train_try, X_test_try = create_ground_truth(Lu,
                                                                                    Ld,
                                                                                    m_train,
                                                                                    m_test, 
                                                                                    s=s, 
                                                                                    K=k, 
                                                                                    K0=K0_max, 
                                                                                    dictionary_type=dictionary_type, 
                                                                                    sparsity_mode=sparsity_mode)
            
            max_possible_sparsity, acc = verify_dic(D_try, Y_train_try, X_train_try, K0_max, .7)
            if max_possible_sparsity > best_sparsity:
                best_sparsity = max_possible_sparsity
                best_acc = acc
                D_true[:, :, sim] = D_try
                Y_train[:, :, sim] = Y_train_try
                Y_test[:, :, sim] = Y_test_try
                epsilon_true[sim] = epsilon_try
                c_true[sim] = c_try
                X_train[:, :, sim] = X_train_try
                X_test[:, :, sim] = X_test_try

        except Exception as e:
            print(f"Error during dictionary creation: {e}")
    if verbose:
        print(f"...Done! # Best Sparsity: {best_sparsity}")

  0%|          | 0/3000 [00:00<?, ?it/s]

100%|██████████| 3000/3000 [04:24<00:00, 11.34it/s]


...Done! # Best Sparsity: 6


100%|██████████| 3000/3000 [04:58<00:00, 10.04it/s]


...Done! # Best Sparsity: 6


100%|██████████| 3000/3000 [04:55<00:00, 10.17it/s]


...Done! # Best Sparsity: 6


100%|██████████| 3000/3000 [04:56<00:00, 10.12it/s]


...Done! # Best Sparsity: 6


100%|██████████| 3000/3000 [04:54<00:00, 10.18it/s]


...Done! # Best Sparsity: 6


100%|██████████| 3000/3000 [04:59<00:00, 10.01it/s]


...Done! # Best Sparsity: 8


100%|██████████| 3000/3000 [05:01<00:00,  9.96it/s]


...Done! # Best Sparsity: 6


100%|██████████| 3000/3000 [05:05<00:00,  9.82it/s]


...Done! # Best Sparsity: 6


100%|██████████| 3000/3000 [04:45<00:00, 10.52it/s]


...Done! # Best Sparsity: 6


100%|██████████| 3000/3000 [04:20<00:00, 11.52it/s]

...Done! # Best Sparsity: 6





In [4]:
import scipy.linalg as sla
import numpy as np
import numpy.linalg as la
import cvxpy as cp
from tsplearn.data_gen import *
from typing import Tuple

def topological_dictionary_learn(Y_train: np.ndarray,
                                 Y_test: np.ndarray, 
                                 K: int, 
                                 n: int, 
                                 s: int,
                                 D0: np.ndarray, 
                                 X0: np.ndarray, 
                                 Lu: np.ndarray, 
                                 Ld: np.ndarray,
                                 dictionary_type: str, 
                                 c: float, 
                                 epsilon: float, 
                                 K0: int,
                                 lambda_: float = 1e-3, 
                                 max_iter: int = 10, 
                                 patience: int = 10,
                                 tol: float = 1e-7, 
                                 verbose: int = 0) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Dictionary learning algorithm implementation for sparse representations of a signal on complex regular cellular.
    The algorithm consists of an iterative alternating optimization procedure defined in two steps: the positive semi-definite programming step
    for obtaining the coefficients and dictionary based on Hodge theory, and the Orthogonal Matching Pursuit step for constructing 
    the K0-sparse solution from the dictionary found in the previous step, which best approximates the original signal.
    Args:
        Y_train (np.ndarray): Training data.
        Y_test (np.ndarray): Testing data.
        K (int): Max order of the polynomial for the single sub-dictionary.
        n (int): Number of data points (number of nodes in the data graph).
        s (int): Number of kernels (sub-dictionaries).
        D0 (np.ndarray): Initial dictionary.
        X0 (np.ndarray): Initial sparse representation.
        Lu (np.ndarray): Upper Laplacian matrix
        Ld (np.ndarray): Lower Laplacian matrix
        dictionary_type (str): Type of dictionary.
        c (float): Boundary constant from the synthetic data generation process.
        epsilon (float): Boundary constant from the synthetic data generation process.
        K0 (int): Sparsity of the signal representation.
        lambda_ (float, optional): Regularization parameter. Defaults to 1e-3.
        max_iter (int, optional): Maximum number of iterations. Defaults to 10.
        patience (int, optional): Patience for early stopping. Defaults to 10.
        tol (float, optional): Tolerance value. Defaults to 1e-7.
        verbose (int, optional): Verbosity level. Defaults to 0.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
         minimum training error, minimum testing error, optimal coefficients, optimal testing sparse representation, and optimal training sparse representation.
    """

    # Define hyperparameters
    min_error_train_norm, min_error_test_norm = 1e20, 1e20
    m_test, m_train = Y_test.shape[1], Y_train.shape[1]
    iter_, pat_iter = 1, 0

    if dictionary_type != "fourier":
        if dictionary_type=="joint":
            Lk, _, _ = compute_Lk_and_lambdak(Lu + Ld, K)
        elif dictionary_type=="edge_laplacian":
            Lk, _, _ = compute_Lk_and_lambdak(Ld, K)
        elif dictionary_type=="separated":
            Luk, _, _ = compute_Lk_and_lambdak(Lu, K, separated=True)
            Ldk, _, _ = compute_Lk_and_lambdak(Ld, K, separated=True)

        # Init the dictionary and the sparse representation 
        D_coll = [cp.Constant(D0[:,(n*i):(n*(i+1))]) for i in range(s)]
        Y = cp.Constant(Y_train)
        X_train = X0
        
        while pat_iter < patience and iter_ <= max_iter:
            
            # SDP Step
            # Init constants and parameters
            D_coll = [cp.Constant(np.zeros((n, n))) for i in range(s)] 
            Dsum = cp.Constant(np.zeros((n, n)))
            X = cp.Constant(X_train)
            I = cp.Constant(np.eye(n))
            
            # Define the objective function
            if dictionary_type in ["joint", "edge_laplacian"]:
                # Init the variables
                h = cp.Variable((s, K))
                hI = cp.Variable((s, 1))
                for i in range(0,s):
                    tmp =  cp.Constant(np.zeros((n, n)))
                    for j in range(0,K):
                        tmp += (cp.Constant(Lk[j, :, :]) * h[i,j])
                    tmp += (I*hI[i])
                    D_coll[i] = tmp
                    Dsum += tmp
                D = cp.hstack([D_coll[i]for i in range(s)])
                term1 = cp.square(cp.norm((Y - D @ X), 'fro'))
                term2 = cp.square(cp.norm(h, 'fro')*lambda_)
                term3 = cp.square(cp.norm(hI, 'fro')*lambda_)
                obj = cp.Minimize(term1 + term2 + term3)

            else:
                # Init the variables
                hI = cp.Variable((s, K))
                hS = cp.Variable((s, K))
                hH = cp.Variable((s, 1))
                for i in range(0,s):
                    tmp =  cp.Constant(np.zeros((n, n)))
                    for j in range(0,K):
                        tmp += ((cp.Constant(Luk[j, :, :])*hS[i,j]) + (cp.Constant(Ldk[j, :, :])*hI[i,j]))
                    tmp += (I*hH[i])
                    D_coll[i] = tmp
                    Dsum += tmp
                D = cp.hstack([D_coll[i]for i in range(s)])
                
                term1 = cp.square(cp.norm((Y - D @ X), 'fro'))
                term2 = cp.square(cp.norm(hI, 'fro')*lambda_)
                term3 = cp.square(cp.norm(hS, 'fro')*lambda_)
                term4 = cp.square(cp.norm(hH, 'fro')*lambda_)
                obj = cp.Minimize(term1 + term2 + term3 + term4)

            # Define the constraints
            constraints = [D_coll[i] >> 0 for i in range(s)] + \
                            [(cp.multiply(c, I) - D_coll[i]) >> 0 for i in range(s)] + \
                            [(Dsum - cp.multiply((c - epsilon), I)) >> 0, (cp.multiply((c + epsilon), I) - Dsum) >> 0]

            prob = cp.Problem(obj, constraints)
            prob.solve(solver=cp.MOSEK, verbose=False)
            # Update the dictionary
            D = D.value

            # OMP Step
            dd = la.norm(D, axis=0)
            W = np.diag(1. / dd)
            Domp = D @ W
            X_train = np.apply_along_axis(lambda x: get_omp_coeff(K0, Domp=Domp, col=x), axis=0, arr=Y_train)
            X_test = np.apply_along_axis(lambda x: get_omp_coeff(K0, Domp=Domp, col=x), axis=0, arr=Y_test)
            # Normalize?
            X_train = W @ X_train
            X_test = W @ X_test

            # Error Updating
            error_train_norm = (1/m_train)* np.sum(la.norm(Y_train - (D @ X_train), axis=0)**2 /
                                    la.norm(Y_train, axis=0)**2)
            error_test_norm = (1/m_test)* np.sum(la.norm(Y_test - (D @ X_test), axis=0)**2 /
                                    la.norm(Y_test, axis=0)**2)

            # Error Storing
            if (error_train_norm < min_error_train_norm) and (abs(error_train_norm) > np.finfo(float).eps) and (abs(error_train_norm - min_error_train_norm) > tol):
                X_opt_train = X_train
                min_error_train_norm = error_train_norm

            if (error_test_norm < min_error_test_norm) and (abs(error_test_norm) > np.finfo(float).eps) and (abs(error_test_norm - min_error_test_norm) > tol):
                h_opt = h.value if dictionary_type in ["joint", "edge_laplacian"] else np.hstack([hI.value, hS.value, hH.value])
                D_opt = D
                X_opt_test = X_test
                min_error_test_norm = error_test_norm
                pat_iter = 0
                if verbose == 1:
                    print("New Best Test Error:", min_error_test_norm)
            else:
                pat_iter += 1

            iter_ += 1
    
    else:
        # Fourier Dictionary Benchmark
        L = Lu + Ld
        _, D_opt = sla.eigh(L)
        dd = la.norm(D_opt, axis=0)
        W = np.diag(1./dd)  
        D_opt = D_opt / la.norm(D_opt)
        Domp = D_opt@W
        X_opt_train = np.apply_along_axis(lambda x: get_omp_coeff(K0, Domp=Domp.real, col=x), axis=0, arr=Y_train)
        X_opt_test = np.apply_along_axis(lambda x: get_omp_coeff(K0, Domp=Domp.real, col=x), axis=0, arr=Y_test)
        X_opt_train = W @ X_opt_train
        X_opt_test = W @ X_opt_test
        # Error Updating
        min_error_train_norm = (1/m_train)* np.sum(la.norm(Y_train - (D_opt @ X_opt_train), axis=0)**2 /
                                la.norm(Y_train, axis=0)**2)
        min_error_test_norm = (1/m_test)* np.sum(la.norm(Y_test - (D_opt @ X_opt_test), axis=0)**2 /
                                la.norm(Y_test, axis=0)**2)
        h_opt = 0
        
    return min_error_train_norm, min_error_test_norm, h_opt, X_opt_test, X_opt_train

# TO DO

- find the dimensions of X_train and divide X_train in blocks like D_coll (to find X_p for each dictionary D_p)
- check the dimensionality of L_k to understand the multiplication of L_k by X_p (or X_train)

In [3]:
import dill
import os

path = os.getcwd()
dill.load_session(path+'\\results\\separated\\ipynb_env.db')

In [7]:
print(cp.installed_solvers())

['CLARABEL', 'CVXOPT', 'ECOS', 'ECOS_BB', 'GLPK', 'GLPK_MI', 'MOSEK', 'OSQP', 'SCIPY', 'SCS', 'SDPA']


In [87]:
import scipy.linalg as sla
import numpy as np
import numpy.linalg as la
import cvxpy as cp
from tsplearn.data_gen import *
from typing import Tuple


def compute_vandermonde(L):
    
    def polynomial_exp(x, k):
        x = x** np.arange(0, k + 1)
        return x

    eigenvalues, _ = sla.eig(L)
    idx = eigenvalues.argsort()
    tmp_df = pd.DataFrame({'Eigs': eigenvalues[idx]})
    tmp_df['Poly'] = tmp_df['Eigs'].apply(lambda x:  polynomial_exp(x,k))
    B = np.vstack(tmp_df['Poly'].to_numpy())

    return B

def topological_dictionary_learn(Y_train: np.ndarray,
                                 Y_test: np.ndarray, 
                                 K: int, 
                                 n: int, 
                                 s: int,
                                 D0: np.ndarray, 
                                 X0: np.ndarray, 
                                 Lu: np.ndarray, 
                                 Ld: np.ndarray,
                                 dictionary_type: str, 
                                 c: float, 
                                 epsilon: float, 
                                 K0: int,
                                 lambda_: float = 1e-3, 
                                 max_iter: int = 10, 
                                 patience: int = 10,
                                 tol: float = 1e-7, 
                                 verbose: int = 0) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Dictionary learning algorithm implementation for sparse representations of a signal on complex regular cellular.
    The algorithm consists of an iterative alternating optimization procedure defined in two steps: the positive semi-definite programming step
    for obtaining the coefficients and dictionary based on Hodge theory, and the Orthogonal Matching Pursuit step for constructing 
    the K0-sparse solution from the dictionary found in the previous step, which best approximates the original signal.
    Args:
        Y_train (np.ndarray): Training data.
        Y_test (np.ndarray): Testing data.
        K (int): Max order of the polynomial for the single sub-dictionary.
        n (int): Number of data points (number of nodes in the data graph).
        s (int): Number of kernels (sub-dictionaries).
        D0 (np.ndarray): Initial dictionary.
        X0 (np.ndarray): Initial sparse representation.
        Lu (np.ndarray): Upper Laplacian matrix
        Ld (np.ndarray): Lower Laplacian matrix
        dictionary_type (str): Type of dictionary.
        c (float): Boundary constant from the synthetic data generation process.
        epsilon (float): Boundary constant from the synthetic data generation process.
        K0 (int): Sparsity of the signal representation.
        lambda_ (float, optional): Regularization parameter. Defaults to 1e-3.
        max_iter (int, optional): Maximum number of iterations. Defaults to 10.
        patience (int, optional): Patience for early stopping. Defaults to 10.
        tol (float, optional): Tolerance value. Defaults to 1e-7.
        verbose (int, optional): Verbosity level. Defaults to 0.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
         minimum training error, minimum testing error, optimal coefficients, optimal testing sparse representation, and optimal training sparse representation.
    """

    # Define hyperparameters
    min_error_train_norm, min_error_test_norm = 1e20, 1e20
    m_test, m_train = Y_test.shape[1], Y_train.shape[1]
    iter_, pat_iter = 1, 0

    if dictionary_type != "fourier":
        if dictionary_type=="joint":
            L = Lu + Ld
            Lk, _, _ = compute_Lk_and_lambdak(L, K)
            B = compute_vandermonde(L)
            B = cp.Constant(B.real)

        elif dictionary_type=="edge_laplacian":
            L = Ld
            Lk, _, _ = compute_Lk_and_lambdak(L, K)
            B = compute_vandermonde(L)
            B = cp.Constant(B.real)
        elif dictionary_type=="separated":

            Luk, _, _ = compute_Lk_and_lambdak(Lu, K, separated=True)
            Ldk, _, _ = compute_Lk_and_lambdak(Ld, K, separated=True)
            Bu = compute_vandermonde(Lu)
            Bd = compute_vandermonde(Ld)
            B = cp.hstack([Bu.real, Bd[:, 1:].real])

        # Init the the sparse representation 
        Y = cp.Constant(Y_train)
        I = np.eye(n)
        I_2 = cp.Constant(np.eye(s*(2*K+1)))
        I_s = cp.Constant(np.eye(s))
        i_s = cp.Constant(np.ones((s,1)))
        X_train = X0

        def aux_matrix(Lu,Ld,X_train,K,N):

            LL = [np.eye(N)]
            LL_tmp = []
            for i in range(1, K + 1):
                LL.append(la.matrix_power(Lu, i))
                LL_tmp.append(la.matrix_power(Ld, i))

            LL = LL + LL_tmp
            LL = np.array(LL)
            P = np.array([LL@X_train[(i*n): ((i+1)*n), :] for i in range(s)])

            return P
        
        P = aux_matrix(Lu,Ld,X_train,K,n)
        
        while pat_iter < patience and iter_ <= max_iter:
            
            # QP Step
            
            # # Define the objective function
            # if dictionary_type in ["joint", "edge_laplacian"]:
            #     # Init the variables
            #     break

            # else:
            

            # Init variables and parameters
            h = cp.Variable((s*(2*k+1), 1))
            l = cp.Constant(np.zeros((s*(2*k+1), 1)))
            Q = cp.Constant(np.zeros((s*(2*k+1), s*(2*k+1))))

            for i in range(n):
                for j in range(m_train):
                    Pij = cp.Constant(P[:,:,i,j].flatten().reshape(-1,1))
                    l = l + (Y[i,j]*Pij)
                    Q = Q + Pij@Pij.T

            Q = Q + cp.multiply(lambda_, I_2)
            # Quadratic term
            term2 = cp.quad_form(h, Q, assume_PSD = True)
            # Linear term
            term1 = (-2)*l.T@h
            
            obj = cp.Minimize(term2+term1)

            # Define the constraints
            cons1 = cp.kron(I_s,B)@h
            cons2 = cp.kron(i_s.T,B)@h
            bound1 = cp.Constant(np.ones((n*s,1)))
            bound2 = cp.Constant(np.ones((n,1)))
            constraints = [cons1 >= cp.multiply(0, bound1)] + \
                            [cons1 <= cp.multiply(c, bound1)] + \
                            [cons2 >= cp.multiply((c-epsilon), bound2)] + \
                            [cons2 <= cp.multiply((c+epsilon), bound2)]

            prob = cp.Problem(obj, constraints)
            prob.solve(solver=cp.MOSEK)
            
            # Update the dictionary
            h = h.value.flatten()
            D_coll = []
            hH = h[:s].reshape((s,1))
            hS = h[s:((s*K)+s)].reshape((s,k))
            hI = h[((s*K)+s):].reshape((s,k))

            for i in range(0,s):
                # tmp =  np.zeros((n, n))
                # for j in range(0,K):
                #     tmp += ((Luk[j, :, :])*hS[(i*(s-1)+j)]) + ((Ldk[j, :, :])*hI[(i*(s-1)+j)])
                # tmp += (I*hH[i])
                hu = hS[i].reshape(k,1,1)
                hd = hI[i].reshape(k,1,1)
                hid = hH[i]
                tmp = np.sum(hu*Luk + hd*Ldk, axis=0) + hid*np.eye(n,n)
                D_coll.append(tmp)

            D = np.hstack(tuple(D_coll))

            # OMP Step
            dd = la.norm(D, axis=0)
            W = np.diag(1. / dd)
            Domp = D @ W
            X_train = np.apply_along_axis(lambda x: get_omp_coeff(K0, Domp=Domp, col=x), axis=0, arr=Y_train)
            X_test = np.apply_along_axis(lambda x: get_omp_coeff(K0, Domp=Domp, col=x), axis=0, arr=Y_test)
            # Normalize
            X_train = W @ X_train
            X_test = W @ X_test

            # Error Updating
            error_train_norm = (1/m_train)* np.sum(la.norm(Y_train - (D @ X_train), axis=0)**2 /
                                    la.norm(Y_train, axis=0)**2)
            error_test_norm = (1/m_test)* np.sum(la.norm(Y_test - (D @ X_test), axis=0)**2 /
                                    la.norm(Y_test, axis=0)**2)

            # Error Storing
            if (error_train_norm < min_error_train_norm) and (abs(error_train_norm) > np.finfo(float).eps) and (abs(error_train_norm - min_error_train_norm) > tol):
                X_opt_train = X_train
                min_error_train_norm = error_train_norm

            if (error_test_norm < min_error_test_norm) and (abs(error_test_norm) > np.finfo(float).eps) and (abs(error_test_norm - min_error_test_norm) > tol):
                h_opt = h if dictionary_type in ["joint", "edge_laplacian"] else np.hstack([hI, hS, hH])
                D_opt = D
                X_opt_test = X_test
                min_error_test_norm = error_test_norm
                pat_iter = 0
                if verbose == 1:
                    print("New Best Test Error:", min_error_test_norm)
            else:
                pat_iter += 1

            iter_ += 1
    
    else:
        # Fourier Dictionary Benchmark
        L = Lu + Ld
        _, D_opt = sla.eigh(L)
        dd = la.norm(D_opt, axis=0)
        W = np.diag(1./dd)  
        D_opt = D_opt / la.norm(D_opt)
        Domp = D_opt@W
        X_opt_train = np.apply_along_axis(lambda x: get_omp_coeff(K0, Domp=Domp.real, col=x), axis=0, arr=Y_train)
        X_opt_test = np.apply_along_axis(lambda x: get_omp_coeff(K0, Domp=Domp.real, col=x), axis=0, arr=Y_test)
        X_opt_train = W @ X_opt_train
        X_opt_test = W @ X_opt_test
        # Error Updating
        min_error_train_norm = (1/m_train)* np.sum(la.norm(Y_train - (D_opt @ X_opt_train), axis=0)**2 /
                                la.norm(Y_train, axis=0)**2)
        min_error_test_norm = (1/m_test)* np.sum(la.norm(Y_test - (D_opt @ X_opt_test), axis=0)**2 /
                                la.norm(Y_test, axis=0)**2)
        h_opt = 0
        
    return min_error_train_norm, min_error_test_norm, h_opt, X_opt_test, X_opt_train

In [17]:
def aux_matrix(Lu,Ld,X_train,K,N):

    LL = [np.eye(N)]
    LL_tmp = []
    for i in range(1, K + 1):
        LL.append(la.matrix_power(Lu, i))
        LL_tmp.append(la.matrix_power(Ld, i))


    LL= LL + LL_tmp
    LL = np.array(LL)
    P = np.array([LL@X_train[(i*n): ((i+1)*n), :] for i in range(s)])

    return P, LL

P , LL= aux_matrix(Lu, Ld, X_train[:,:,7], k, n)
I_2 = cp.Constant(np.eye(s*(2*k+1)))

# Init variable 
h = cp.Variable((s*(2*k+1), 1))
c = cp.Constant(np.zeros((s*(2*k+1), 1)))
Q = cp.Constant(np.zeros((s*(2*k+1), s*(2*k+1))))

for i in range(n):
    for j in range(m_train):
        Pij = cp.Constant(P[:,:,i,j].flatten().reshape(-1,1))
        c = c + (Y_train[:,:,7][i,j]*Pij)
        Q = Q + Pij@Pij.T

Q = Q + cp.multiply(lambda_, I_2)
term2 = cp.quad_form(h, Q)
term1 = cp.multiply(-2, c.T@h)

# Check convexity
assert np.all(np.linalg.eigvals(Q.value) >= 0), f'element {(i,j)} not positive semi-definite'

In [66]:
h = cp.Variable((s*(2*k+1), 1))

In [68]:
h.value

In [69]:
h = cp.Constant(np.ones((s*(2*k+1), 1))).value.flatten()

hH = h[:s]
hS = h[s:((s*k)+s)]
hI = h[((s*k)+s):]

In [74]:
hI.shape

(6,)

In [26]:
scipy.linalg.ishermitian(Q.value)

True

In [21]:
np.all(Q.value == Q.T.value)

True

In [18]:
np.linalg.eigvals(Q.value)

array([8.44892279e+05, 7.92792867e+05, 7.21606356e+05, 2.40006786e+05,
       2.26664942e+05, 1.65928087e+05, 5.82632652e+01, 7.39616138e+01,
       7.13863804e+01, 9.08104578e+02, 9.63831258e+02, 7.66645640e+02,
       5.05784958e+02, 6.02422921e+02, 6.63128410e+02])

In [89]:
lambda_ = 1e-5

In [90]:
import warnings
from tqdm import tqdm

warnings.filterwarnings('ignore')

min_error_fou_train = np.zeros((n_sim, len(K0_coll)))
min_error_fou_test = np.zeros((n_sim, len(K0_coll)))
min_error_sep_train = np.zeros((n_sim, len(K0_coll)))
min_error_sep_test = np.zeros((n_sim, len(K0_coll)))
min_error_edge_train = np.zeros((n_sim, len(K0_coll)))
min_error_edge_test = np.zeros((n_sim, len(K0_coll)))
min_error_joint_train = np.zeros((n_sim, len(K0_coll)))
min_error_joint_test = np.zeros((n_sim, len(K0_coll)))

dict_errors = {
    "sep": (min_error_sep_train,min_error_sep_test)
    }


dict_types = {
    "sep": ("Separated Hodge Laplacian","separated")
    }

for sim in range(6,n_sim):
    c = c_true[sim]  
    epsilon = epsilon_true[sim] 
    for k0_index, k0 in tqdm(enumerate(K0_coll)):
        discard = 1
        while discard == 1:
            
            try:
                D0, X0, discard = initialize_dic(Lu, Ld, s, k, Y_train[:, :, sim], k0, dictionary_type, c, epsilon, "only_X")
            except:
                print("Initialization Failed!")

        for d in dict_types.items():
            # try:
            dict_errors[d[0]][0][sim,k0_index], dict_errors[d[0]][1][sim,k0_index], _, _, _ = topological_dictionary_learn(Y_train[:,:,sim], Y_test[:,:,sim],
                                                                                                                        k, n, s, D0, X0, Lu, Ld, d[1][1],
                                                                                                                        c, epsilon, k0, lambda_, max_iter,
                                                                                                                        patience, tol)
            if verbose:
                print(f"Simulation: {sim+1}/{n_sim} Sparsity: {k0} Testing {d[1][0]}... Done! Test Error: {dict_errors[d[0]][1][sim,k0_index]}")
            # except:
            #     print(f'Simulation: {sim+1}/{n_sim} Sparsity: {k0} Testing {d[1][0]}... Diverged!')
            #     try:
            #         dict_errors[d[0]][0][sim,k0_index], dict_errors[d[0]][1][sim,k0_index] = (dict_errors[d[0]][0][sim-1,k0_index]
            #                                                                               , dict_errors[d[0]][1][sim-1,k0_index])
            #     except:
            #         dict_errors[d[0]][0][sim,k0_index], dict_errors[d[0]][1][sim,k0_index] = (dict_errors[d[0]][0][sim+1,k0_index]
            #                                                                               , dict_errors[d[0]][1][sim+1,k0_index])

1it [02:33, 153.03s/it]

Simulation: 7/10 Sparsity: 5 Testing Separated Hodge Laplacian... Done! Test Error: 0.08087501978562617


2it [05:21, 162.33s/it]

Simulation: 7/10 Sparsity: 9 Testing Separated Hodge Laplacian... Done! Test Error: 0.03028946014671574


3it [08:07, 163.78s/it]

Simulation: 7/10 Sparsity: 13 Testing Separated Hodge Laplacian... Done! Test Error: 0.015102613661098399


4it [10:48, 162.73s/it]

Simulation: 7/10 Sparsity: 17 Testing Separated Hodge Laplacian... Done! Test Error: 0.009149020365912636


5it [13:37, 165.06s/it]

Simulation: 7/10 Sparsity: 21 Testing Separated Hodge Laplacian... Done! Test Error: 0.006280094506717785


6it [16:20, 163.42s/it]


Simulation: 7/10 Sparsity: 25 Testing Separated Hodge Laplacian... Done! Test Error: 0.004636052395979597


1it [02:40, 160.77s/it]

Simulation: 8/10 Sparsity: 5 Testing Separated Hodge Laplacian... Done! Test Error: 0.13649756489307577


2it [05:19, 159.29s/it]

Simulation: 8/10 Sparsity: 9 Testing Separated Hodge Laplacian... Done! Test Error: 0.13626372224202069


3it [08:02, 161.34s/it]

Simulation: 8/10 Sparsity: 13 Testing Separated Hodge Laplacian... Done! Test Error: 0.05714689398060823


4it [10:43, 161.04s/it]

Simulation: 8/10 Sparsity: 17 Testing Separated Hodge Laplacian... Done! Test Error: 0.038388216921671325


5it [13:34, 164.54s/it]

Simulation: 8/10 Sparsity: 21 Testing Separated Hodge Laplacian... Done! Test Error: 0.014545727536205845


6it [16:14, 162.48s/it]


Simulation: 8/10 Sparsity: 25 Testing Separated Hodge Laplacian... Done! Test Error: 0.010036156947665648


1it [02:40, 160.61s/it]

Simulation: 9/10 Sparsity: 5 Testing Separated Hodge Laplacian... Done! Test Error: 0.08153360112225705


2it [05:20, 160.48s/it]

Simulation: 9/10 Sparsity: 9 Testing Separated Hodge Laplacian... Done! Test Error: 0.02842892294320222


2it [07:36, 228.45s/it]


KeyboardInterrupt: 