In [1]:
import numpy as np
import cvxpy as cp
from matplotlib import pyplot as plt
from scipy.linalg import fractional_matrix_power
from tqdm import tqdm
from itertools import combinations

import warnings
warnings.filterwarnings("ignore")

The full pipeline to learn the sheaf laplacian is going to take into account several steps. 
+ First of all, we take as granted the imputation of the maps via KKT on a given set of edges: we initialize the algorithm to work on the full graph starting from all maps being identity maps. 
+ We leverage structured dictionaries based on sheaf convolutional filters to learn the best representation of the 0-cochains with a double localization quality within each subdictionary: each atom gives first of all a node domain localization, and then a rank-1 stalk subspace localization. 
+ We then define a greedy criterion to start removing edges: at each iteration we compute all the possible representation of the 0-cochains removing one of the possible edges, and we decide to remove the edge yielding the best improvement on the sparse coding in terms of reconstruction error.
+ Finally, once an edge has been removed, we compute all the restriction maps again and repeat the procedure until a convergence criterion is met. 

In [2]:
N = 20
edges = []

cutoff = 0.5
theta = 0.9

points = np.random.rand(N, 2)

A = np.zeros((N,N))
W = np.zeros((N,N))

for i in range(N):
    for j in range(i+1, N):
        
        A[i,j] = np.linalg.norm(points[i,:] - points[j,:]) <= cutoff
        A[j,i] = np.linalg.norm(points[i,:] - points[j,:]) <= cutoff

        if A[i,j] == 1:
            edges.append((i,j))

In [3]:
def random_sheaf(
        V:int,
        d:int,
        edges:list
        ) -> np.array:

    E = len(edges)

    # Incidency linear maps

    F = {
        e:{
            e[0]:np.random.randn(d,d),
            e[1]:np.random.randn(d,d)
            } 
            for e in edges
        }                                           

    # Coboundary maps

    B = np.zeros((d*E, d*V))                        

    for i in range(len(edges)):

        # Main loop to populate the coboundary map

        edge = edges[i]

        u = edge[0] 
        v = edge[1] 

        B_u = F[edge][u]
        B_v = F[edge][v]

        B[i*d:(i+1)*d, u*d:(u+1)*d] = B_u           
        B[i*d:(i+1)*d, v*d:(v+1)*d] = - B_v

    L_f = B.T @ B

    return L_f

In [4]:
d = 5
Lf = random_sheaf(N, d, edges)

In [5]:
D = np.diag(np.diagonal(Lf))

In [6]:
Lf = fractional_matrix_power(D, -0.5) @ Lf @ fractional_matrix_power(D, -0.5)

In [7]:
band1 = np.array([i in range(0,25) for i in range(100)])
band2 = np.array([i in range(25,50) for i in range(100)]) 
band3 = np.array([i in range(50,75) for i in range(100)])
band4 = np.array([i in range(75,100) for i in range(100)])

bands = [band1, band2, band3, band4]

In [8]:
Lambda, U = np.linalg.eig(Lf)
Lambda = np.sort(Lambda)

In [9]:
J = 400
DD = np.zeros((N*d, J))

In [10]:
for j in range(J): 

    B = j // 100
    h = np.copy(Lambda)
    h[~bands[B]] = 0
    h *= np.random.rand(h.shape[0],1)[:,0]
    n = np.random.choice(d*N)

    DD[:,j] = (U @ np.diag(h) @ U.T)[:,n]

In [11]:
M = 1000 
Y_train = np.zeros((N*d,M))

for m in range(M):
    T = np.random.choice(J, 4)
    Y_train[:,m] = DD[:,T] @ np.random.rand(4)

test_size = 2000
Y_test = np.zeros((N*d,test_size))

for t in range(test_size):
    T = np.random.choice(J, 4)
    Y_test[:,t] = DD[:,T] @ np.random.rand(4)

In [12]:
c = 1
eps = 0.01

T0 = 20         # Assumed sparsity
S = 4           # Number of dictionaries
K = 5           # Power of the laplacian

In [13]:
# Subroutine for sparse coding

def OMP(y, D, T0):
    S = []

    x = np.zeros(D.shape[1])
    iters = 0
    R = y

    while iters < T0:

        # Retrieve the maximum correlation between atoms and residuals of the previous iteration
        corrs = np.abs(D.T @ R)
        corrs[S] = -np.inf
        S.append(np.argmax(np.abs(D.T @ R)))

        # Expand the dictionary for the representation
        dic = D[:,S]

        # Solve subproblems and update x
        x[S] = np.linalg.inv(dic.T @ dic) @ dic.T @ y
        
        # Update the residuals
        R = y - D @ x
        iters += 1

    return x

In [14]:
# Dictionary update subroutine

def LaplacianPowers(L, K):
    powers = np.zeros((K+1, L.shape[0], L.shape[1]))
    for k in range(K+1):
        powers[k,:,:] = np.linalg.matrix_power(L, k)

    return powers 

def DicUp(LaplPowers, K, S, Y, X, d, N, c, eps, mu = 1):
    # Variables
    alpha = cp.Variable((K+1, S))

    # Define D_s expressions
    D_s_list = []
    for s in range(S):
        D_s = sum(alpha[k, s] * LaplPowers[k,:,:] for k in range(K+1))
        D_s_list.append(D_s)

    # Objective function
    D = cp.hstack(D_s_list)
    objective = cp.Minimize(cp.norm(Y - D @ X, 'fro')**2 + mu * cp.norm(alpha, 2)**2)

    # Constraints
    constraints = []
    for s in range(S):
        constraints.append(D_s_list[s] >> 0)
        constraints.append(D_s_list[s] << c * np.eye(d*N))

    sum_D_s = sum(D_s_list)
    constraints.append(sum_D_s >> (c - eps) * np.eye(d*N))
    constraints.append(sum_D_s << (c + eps) * np.eye(d*N))

    # Problem definition and solving
    prob = cp.Problem(objective, constraints)
    prob.solve(solver=cp.SCS, max_iters=100, eps=1e-5, warm_start=True)

    alpha_star = alpha.value
    D_s_list = []
    for s in range(S):
        D_s = sum(alpha_star[k, s] * LaplPowers[k,:,:] for k in range(K+1))
        D_s_list.append(D_s)

    # Objective function
    D = np.hstack(D_s_list)
    return D, alpha_star

In [15]:
# Parametric dictionary learning algorithm 

def ParametricDictionaryLearning(Lf, Y, T0, K, S, d, N, c, eps, mu = 1, MAX_ITER = 5):

    loss = []

    # Precompute Laplacian powers
    LP = LaplacianPowers(Lf, K)

    # Dictionary initialization 
    D_s_list = []
    for _ in range(S):
        D_s = sum(LP[k,:,:] for k in range(K+1))
        D_s_list.append(D_s)

    D = np.hstack(D_s_list)

    # Alternated learning procedure
    for _ in tqdm(range(MAX_ITER)):
        
        # Sparse coding update
        X = np.apply_along_axis(OMP, axis = 0, arr = Y, D = D, T0 = T0)

        # Dictionary update
        D, alpha_star = DicUp(LP, K, S, Y, X, d, N, c, eps, mu)
        D = np.apply_along_axis(lambda x: x / np.linalg.norm(x), axis = 0, arr = D)

        loss.append(np.linalg.norm(Y - D @ X, ord = 'fro'))
        
    return D, X, alpha_star, loss

In [16]:
def GreedySelection(alpha, edges, d, Lf, K, Y, D, X):
    
    impros = {edge:None for edge in edges}

    for edge in edges:

        # Mask the sheaf laplacian assuming that such an edge is removed
        L = np.copy(Lf)
        u = edge[0]
        v = edge[1]
        L[u*d:(u+1)*d,v*d:(v+1)*d] = 0
        L[v*d:(v+1)*d,u*d:(u+1)*d] = 0
        LaplPowers = LaplacianPowers(L, K)

        # Compute the retrieved dictionary 
        D_s_list = []
        for s in range(S):
            D_s = sum(alpha[k, s] * LaplPowers[k,:,:] for k in range(K+1))
            D_s_list.append(D_s)

        D_ = np.hstack(D_s_list)

        # Store the improvement
        impros[edge] = np.linalg.norm(Y - D @ X) - np.linalg.norm(Y - D_ @ X)

    return [edge for edge in edges if edge != max(impros, key = impros.get)]

In [17]:
def premultiplier(Xu, Xv):
    uu = np.linalg.inv(Xu @ Xu.T)
    uv = Xu @ Xv.T
    vv = np.linalg.inv(Xv @ Xv.T)
    vu = Xv @ Xu.T

    return (uu, uv, vv, vu)

def chi_u(uu, uv, vv, vu):

    return ((uu @ uv - np.eye(uu.shape[0])) @ vv @ np.linalg.inv(vu @ uu @ uv @ vv - np.eye(uu.shape[0])) @ vu - np.eye(uu.shape[0])) @ uu

def chi_v(uu, uv, vv, vu):

    return (uu @ uv - np.eye(uu.shape[0])) @ vv @ np.linalg.inv(vu @ uu @ uv @ vv - np.eye(uu.shape[0]))

In [18]:
def LaplacianUpdate(D, X, d, edges, N, Q):    
    T = 0
    Y = D @ X

    maps = {
        edge : {
            edge[0] : np.zeros((d,d)),
            edge[1] : np.zeros((d,d))
        }
    for edge in edges
    }

    for e in edges:
        u = e[0]
        v = e[1]

        X_u = Y[u*d:(u+1)*d,:]
        X_v = Y[v*d:(v+1)*d,:]
        uu, uv, vv, vu = premultiplier(X_u, X_v)

        maps[e][u] = chi_u(uu, uv, vv, vu)
        maps[e][v] = - chi_v(uu, uv, vv, vu)
        
        T += np.trace(maps[e][u]) + np.trace(maps[e][v])

    maps_ = {
        edge : {
            edge[0] : Q/T * maps[edge][edge[0]],
            edge[1] : Q/T * maps[edge][edge[1]]
        }
        for edge in edges
    }

    Lf = np.zeros((d*N, d*N))
    for edge in edges:

        u = edge[0] 
        v = edge[1] 

        Lf[u*d:(u+1)*d,u*d:(u+1)*d] += maps_[edge][u].T @ maps_[edge][u]
        Lf[v*d:(v+1)*d,v*d:(v+1)*d] += maps_[edge][v].T @ maps_[edge][v]
        Lf[u*d:(u+1)*d,v*d:(v+1)*d] = - maps_[edge][u].T @ maps_[edge][v]
        Lf[v*d:(v+1)*d,u*d:(u+1)*d] = - maps_[edge][v].T @ maps_[edge][u]

    return Lf

In [19]:
def LaplacianLearningPipeline(
        L,
        Y, 
        E0,
        T0, 
        K, 
        S, 
        d, 
        N, 
        c, 
        eps, 
        Q = 1,
        mu = 1, 
        MAX_ITER = 5):
    
    # Initialization - Full graph with all identity maps
    edges = list(combinations(range(N),2))
    B = np.zeros((d*len(edges), d*N))                        

    for i in range(len(edges)):

        # Main loop to populate the coboundary map
        edge = edges[i]

        u = edge[0] 
        v = edge[1] 

        B_u = np.eye(d)
        B_v = np.eye(d)

        B[i*d:(i+1)*d, u*d:(u+1)*d] = B_u           
        B[i*d:(i+1)*d, v*d:(v+1)*d] = - B_v

    Lf = B.T @ B
    Lf = fractional_matrix_power(np.diag(np.diagonal(Lf)), -0.5) @ Lf @ fractional_matrix_power(np.diag(np.diagonal(Lf)), -0.5)

    # Main loop - First try with a priori knowledge of the number of connections

    while len(edges) > E0:
        
        # Dictionary learning phase
        D, X, alpha, _ = ParametricDictionaryLearning(Lf, Y, T0, K, S, d, N, c, eps, mu, MAX_ITER)

        # Greedy deleting one edge from the graph
        edges = GreedySelection(alpha, edges, d, Lf, K, Y, D, X)

        # Rebuilding the sheaf laplacian on the new graph
        Lf = LaplacianUpdate(D, X, d, edges, N, Q)
        Lf = fractional_matrix_power(np.diag(np.diagonal(Lf)), -0.5) @ Lf @ fractional_matrix_power(np.diag(np.diagonal(Lf)), -0.5)
        print(np.linalg.norm(L - Lf) / L.size)
        
    return Lf, edges

In [20]:
E0 = len(edges)

In [None]:
L_hat, edges_ = LaplacianLearningPipeline(Lf, Y_train, E0, T0, K, S, d, N, c, eps, MAX_ITER=5)

In [22]:
L_hat

array([[ 1.00000000e+00, -1.23688641e-01, -2.12171637e-01, ...,
         2.11066775e-02, -3.50020302e-02,  6.58189308e-02],
       [-1.23688641e-01,  1.00000000e+00, -1.06094094e-01, ...,
         8.32318058e-03, -8.50513066e-03,  4.05869278e-04],
       [-2.12171637e-01, -1.06094094e-01,  1.00000000e+00, ...,
        -1.07284866e-01,  1.06047133e-02, -1.41085287e-02],
       ...,
       [ 2.11066775e-02,  8.32318058e-03, -1.07284866e-01, ...,
         1.00000000e+00,  5.02674574e-02, -2.17200542e-02],
       [-3.50020302e-02, -8.50513066e-03,  1.06047133e-02, ...,
         5.02674574e-02,  1.00000000e+00, -1.60690784e-01],
       [ 6.58189308e-02,  4.05869278e-04, -1.41085287e-02, ...,
        -2.17200542e-02, -1.60690784e-01,  1.00000000e+00]])

In [30]:
EDG = set(edges)
EDG_HAT = set(edges_)

In [32]:
len(EDG.intersection(EDG_HAT))/E0

0.5894736842105263

Big space for improvement but best result so far!