In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from library import *
import cvxpy as cp

### Generating Data

In [2]:
def same_sparsity_structure(A, B, threshold=1e-10):
    A_clean = A.copy()
    A_clean[np.abs(A_clean) < threshold] = 0.0

    B_clean = B.copy()
    B_clean[np.abs(B_clean) < threshold] = 0.0
    mask_A = A_clean != 0.0
    mask_B = B_clean != 0.0

    same_structure = np.array_equal(mask_A, mask_B)

    return same_structure, mask_A, mask_B

In [3]:
max_dimension = 2 # Dimension of the complex, e.g. 2 -> nodes, egdes, triangles
p_complex = [.3,.6]
max_num_simplices = 60 # Maximum number of simplices
min_num_simplices = 20 # Minimum number of simplices
avg_nodes = 10
variance_complex = 2

In [4]:
valid_data_point = 0
valid_complex = 0
while not valid_complex:
      _, incidence_mats, num_simplices = generate_simplicial_complex(avg_nodes, max_dimension, p_complex, variance_complex)
      if num_simplices < max_num_simplices and num_simplices >= min_num_simplices:
        valid_complex = 1
        print("Valid complex generated/loaded!")
      else:
       print("The generated complex does not respect the imposed constraints... Trying again!")

Complex correctly saved on the current path, veryfing if valid...
The generated complex does not respect the imposed constraints... Trying again!
Complex correctly saved on the current path, veryfing if valid...
Valid complex generated/loaded!


In [5]:
B1=incidence_mats[1]
B1.shape

(8, 13)

In [6]:
B2=incidence_mats[2]
B2.shape

(13, 6)

In [7]:
n_nodes=B1.shape[0]
n_edges=B1.shape[1]
n_triangles=B2.shape[1]

In [8]:
D_V=5*np.eye(n_nodes)
D_E=6*np.eye(n_edges)
D_T=5*np.eye(n_triangles) 

In [9]:
row1 = np.hstack([D_V, -B1, np.zeros((n_nodes,n_triangles))])
row2 = np.hstack([-B1.T, D_E, -B2])
row3 = np.hstack([np.zeros((n_triangles,n_nodes)), -B2.T, D_T])
prec_matrix = np.vstack([row1, row2, row3]) #build precision matrix over vertices, edges and triangles
eigvals = np.linalg.eigvalsh(prec_matrix)  
check=np.all(eigvals > 0)
check    #check pos.def.

True

In [11]:
cov=np.linalg.inv(prec_matrix)  #covariance matrix of the complete prec. matrix 
cov[:n_nodes,-n_triangles:] = np.zeros((n_nodes,n_triangles)) 
cov[-n_triangles:,:n_nodes] = np.zeros((n_triangles,n_nodes)) 
iterations=50000
mu=np.zeros(n_nodes+n_edges+n_triangles)
X = np.random.multivariate_normal(mu, cov, size=iterations)  # data generation over vertices, edges and triangles 

In [12]:
X_E=X[:,n_nodes:n_nodes+n_edges]
X_E.shape #edge signals

(50000, 13)

In [13]:
cov_E=cov[n_nodes:n_nodes+n_edges,n_nodes:n_nodes+n_edges] #edge-level covariance matrix
prec_E=np.linalg.inv(cov_E)
prec_E[np.abs(prec_E) < 1e-10] = 0.0 #edge-level precision matrix


In [14]:
S = (1/iterations)*X_E.T @ X_E #empirical covariance
np.linalg.norm(S-cov_E)  #approx error

0.010802468963740411

In [22]:
prec_E_d=D_E-B1.T@np.linalg.inv(D_V)@B1 #prec. matrix of the lower component
prec_E_u=D_E-B2@np.linalg.inv(D_T)@B2.T #prec. matrix of the upper component


### Inference 


In [15]:
class Inference:
    def __init__(
            self,
            S, 
            MAX_ITER,
            inc_mats
    ):
        self.S = S
        self.MAX_ITER = MAX_ITER
        self.B1 = inc_mats[1]
        self.B2 = inc_mats[2]

        self.n_edges = S.shape[0]

        self.mask_d = (self.B1.T @ self.B1 != 0.0).astype(int)
        self.mask_u = (self.B2 @ self.B2.T != 0.0).astype(int)

        for i in range(self.n_edges):
            self.mask_d[i,i] = 1
            self.mask_u[i,i] = 1 
    
        # Build problems (they will be compiled at first solving)
        self._opt_lambda_build()
        self._opt_Theta_d_build()
        self._opt_Theta_u_build()

    def _initialization(
            self
    ):
        '''
        Initialize the variables in the optimization problem
        '''

        l = np.sqrt(np.diag(self.S))
        Theta_u = np.eye(self.n_edges)
        Theta_d = np.eye(self.n_edges)

        return l, Theta_d, Theta_u

    #---------------------------#
    #       LAMBDA STEP
    #---------------------------#

    def _opt_lambda_build(
            self
    ):
        """
        Costruisce il problema:
            minimize   l.T @ Q @ l - 2 * sum(log(s))
            subject to s > 0
        """
        self.Q_param = cp.Parameter((self.n_edges, self.n_edges), PSD=True)
        self.l = cp.Variable(self.n_edges, pos=True)  # impone s_i > 0

        objective = cp.Minimize(cp.quad_form(self.l, self.Q_param) - 2 * cp.sum(cp.log(self.l)))
        self.problem_lambda = cp.Problem(objective)
    
    def _opt_lambda_solve(
            self,
            Q_value,
    ):
        self.Q_param.value = Q_value
        self.problem_lambda.solve(solver=cp.MOSEK, verbose = 0) 

        return self.l.value 
    
    #---------------------------#
    #        THETA STEP
    #---------------------------#

    def _opt_Theta_d_build(
            self,
    ):
        """
        Costruisce il problema:
            min_X trace(X @ H) - logdet(X)
            s.t. X PSD
                X_{ij} = 0  where T_{ij} = 0 (sparsità)
                (X - I)(G - I) = 0
        """

        self.H_param_d = cp.Parameter((self.n_edges, self.n_edges))
        self.T_param_d = cp.Parameter((self.n_edges, self.n_edges))
        self.G_param_d = cp.Parameter((self.n_edges, self.n_edges))

        # Variabile simmetrica
        self.X_d = cp.Variable((self.n_edges, self.n_edges), PSD=True)

        # Obiettivo: trace(XH) - logdet(X)
        #penalty = cp.norm1(cp.multiply(mask, X)) # vincolo topologia
        objective = (
            cp.Minimize(cp.trace(self.X_d @ self.H_param_d) - 
                        cp.log_det(self.X_d))
        )

        constraints = []

        # 1. Vincolo PSD
        constraints.append(self.X_d >> 0)

        #2. Sparsità da T
        for i in range(self.n_edges):
            for j in range(self.n_edges):
               constraints.append(self.X_d[i, j]*(1-self.T_param_d[i, j]) == 0)

        # 3. (X - I)(G - I) = 0 ⇒ per ogni elemento (riga i, colonna j):
        #     sum_k (X[i,k] - δ_ik)(G[k,j] - δ_kj) = 0
        G_shift = self.G_param_d - np.eye(self.n_edges)

        for i in range(self.n_edges):
            for j in range(self.n_edges):
                expr = sum((self.X_d[i, k] - (1.0 if i == k else 0.0)) * G_shift[k, j] for k in range(self.n_edges))
                constraints.append(expr == 0)

        self.problem_theta_d = cp.Problem(objective, constraints)

    def _opt_Theta_u_build(
            self,
    ):
        """
        Costruisce il problema:
            min_X trace(X @ H) - logdet(X)
            s.t. X PSD
                X_{ij} = 0  where T_{ij} = 0 (sparsità)
                (X - I)(G - I) = 0
        """

        self.H_param_u = cp.Parameter((self.n_edges, self.n_edges))
        self.T_param_u = cp.Parameter((self.n_edges, self.n_edges))
        self.G_param_u = cp.Parameter((self.n_edges, self.n_edges))

        # Variabile simmetrica
        self.X_u = cp.Variable((self.n_edges, self.n_edges), PSD=True)

        # Obiettivo: trace(XH) - logdet(X)
        #penalty = cp.norm1(cp.multiply(mask, X)) # vincolo topologia
        objective = (
            cp.Minimize(cp.trace(self.X_u @ self.H_param_u) - 
                        cp.log_det(self.X_u))
        )

        constraints = []

        # 1. Vincolo PSD
        constraints.append(self.X_u >> 0)

        #2. Sparsità da T
        for i in range(self.n_edges):
            for j in range(self.n_edges):
               constraints.append(self.X_u[i, j]*(1-self.T_param_u[i, j]) == 0)

        # 3. (X - I)(G - I) = 0 ⇒ per ogni elemento (riga i, colonna j):
        #     sum_k (X[i,k] - δ_ik)(G[k,j] - δ_kj) = 0
        G_shift = self.G_param_u - np.eye(self.n_edges)
        for i in range(self.n_edges):
            for j in range(self.n_edges):
                expr = sum((self.X_u[i, k] - (1.0 if i == k else 0.0)) * G_shift[k, j] for k in range(self.n_edges))
                constraints.append(expr == 0)

        self.problem_theta_u = cp.Problem(objective, constraints)

    def _opt_Theta_d_solve(
            self,
            H_value,
            T_value,
            G_value,
            solver = cp.MOSEK,
            verbose = 0
    ):    
        self.H_param_d.value = H_value
        self.T_param_d.value = T_value
        self.G_param_d.value = G_value

        self.problem_theta_d.solve(solver = solver, verbose = verbose) 

        return self.X_d.value 
    
    def _opt_Theta_u_solve(
            self,
            H_value,
            T_value,
            G_value,
            solver = cp.MOSEK,
            verbose = 0
    ):    
        self.H_param_u.value = H_value
        self.T_param_u.value = T_value
        self.G_param_u.value = G_value

        self.problem_theta_u.solve(solver = solver, verbose = verbose) 

        return self.X_u.value 
    
    #----------------------------------------------------------#
    #        TWO-STEPS ALTERNATED OPTIMIZATION PIPELINE
    #----------------------------------------------------------#

    def _fit(
            self
    ):
        # Initialization
        l, Theta_d, Theta_u = self._initialization()

        for _ in range(self.MAX_ITER):
            print("Iteration: "+str(_+1)+"/"+str(self.MAX_ITER))
            Q = (Theta_d + Theta_u - np.eye(n_edges)) * S
            #Q = (Q + Q.T)/2

            l = np.copy(self._opt_lambda_solve(Q))
            H = np.diag(l) @ S @ np.diag(l)
            
            Theta_u = np.copy(self._opt_Theta_u_solve(H, self.mask_u, Theta_d))
            Theta_d = np.copy(self._opt_Theta_d_solve(H, self.mask_d, Theta_u))
            clear_output(wait=True)
        return Theta_d, Theta_u, l , Q

In [16]:
infer=Inference(S,1,incidence_mats)

In [17]:
Theta_d, Theta_u, l , Q = infer._fit()

Iteration: 1/1


	https://www.cvxpy.org/tutorial/advanced/index.html#disciplined-parametrized-programming
