# Train a SCCNN

In this notebook, we will create and train a High Skip Network in the simplicial complex domain, as proposed in the paper by [Yang et. al : Convolutional Learning on Simplicial Complexes (2023)](https://arxiv.org/abs/2301.11163). 

We train the model to perform binary node classification using the KarateClub benchmark dataset. 

In [18]:
import torch
import numpy as np
from sklearn.model_selection import train_test_split



from toponetx import SimplicialComplex
import toponetx.datasets as datasets

from topomodelx.nn.simplicial.sccnn_layer import SCCNNLayer

# Pre-processing

## Import dataset ##

We must first lift our graph dataset into the simplicial complex domain.

In [19]:
shrec, _ = datasets.mesh.shrec_16(size="small")

shrec = {key: np.array(value) for key, value in shrec.items()}
 
x_0s = shrec["node_feat"]
x_1s = shrec["edge_feat"]
x_2s = shrec["face_feat"]

ys = shrec["label"]
simplexes = shrec["complexes"]

Loading shrec 16 small dataset...

done!


In [20]:
in_channels_0 = x_0s[-1].shape[1]
in_channels_1 = x_1s[-1].shape[1]
in_channels_2 = x_2s[-1].shape[1]

in_channels_all = (in_channels_0,in_channels_1,in_channels_2)
print(in_channels_all)

(6, 10, 7)


In [21]:
max_rank = 2 # the order of the SC is two 
incidence_1_list = []
incidence_2_list = []

laplacian_0_list = []
laplacian_down_1_list = []
laplacian_up_1_list = []
laplacian_2_list = []
 
for simplex in simplexes: 
    incidence_1 = simplex.incidence_matrix(rank=1)
    incidence_2 = simplex.incidence_matrix(rank=2)
    laplacian_0 = simplex.hodge_laplacian_matrix(rank=0,weight=True)
    laplacian_down_1 = simplex.down_laplacian_matrix(rank=1,weight=True)
    laplacian_up_1 = simplex.up_laplacian_matrix(rank=1,weight=True)
    laplacian_2 = simplex.hodge_laplacian_matrix(rank=2,weight=True)
    
    incidence_1 = torch.from_numpy(incidence_1.todense()).to_sparse()
    incidence_2 = torch.from_numpy(incidence_2.todense()).to_sparse()
    laplacian_0 = torch.from_numpy(laplacian_0.todense()).to_sparse()
    laplacian_down_1 = torch.from_numpy(laplacian_down_1.todense()).to_sparse()
    laplacian_up_1 = torch.from_numpy(laplacian_up_1.todense()).to_sparse()
    laplacian_2 = torch.from_numpy(laplacian_2.todense()).to_sparse()
    
    incidence_1_list.append(incidence_1)
    incidence_2_list.append(incidence_2)
    laplacian_0_list.append(laplacian_0)
    laplacian_down_1_list.append(laplacian_down_1)
    laplacian_up_1_list.append(laplacian_up_1)
    laplacian_2_list.append(laplacian_2)
    

    

# Create the SCCNN

In [22]:
class SCCNN(torch.nn.Module):
    """SCCNN implementation for binary node classification 
    Note: In this task, we direcly consider the finaly output on the nodes, which is passed by a linear layer, as the label output. 

    Parameters
    """
    def __init__(self, in_channels_all, intermediate_channels_all,out_channels_all, conv_order, sc_order, aggr_norm=False,update_func="sigmoid",n_layers=2):
        super().__init__()
        # first layer 
        # layers = [SCCNNLayer(in_channels=in_channels_all,out_channels=intermediate_channels_all,conv_order=conv_order,sc_order=sc_order,aggr_norm=aggr_norm,update_func=update_func)]
        self.in_linear_0 = torch.nn.Linear(in_channels_all[0],intermediate_channels_all[0])
        self.in_linear_1 = torch.nn.Linear(in_channels_all[1],intermediate_channels_all[1])
        self.in_linear_2 = torch.nn.Linear(in_channels_all[2],intermediate_channels_all[2])
        layers = []
        for _ in range(n_layers):
            layers.append(
                SCCNNLayer(in_channels=intermediate_channels_all,out_channels=out_channels_all,conv_order=conv_order,sc_order=sc_order,aggr_norm=aggr_norm,update_func=update_func)
            )
            
        self.layers = layers    
        out_channels_0, out_channels_1, out_channels_2 = out_channels_all
        self.out_linear_0 = torch.nn.Linear(out_channels_0,1)
        self.out_linear_1 = torch.nn.Linear(out_channels_1,1)
        self.out_linear_2 = torch.nn.Linear(out_channels_2,1)
        

    def forward(self,x_all,laplacian_all,incidence_all):
        """Forward computation. 
        
        Parameters
        ----------
        """
        x_0, x_1, x_2 = x_all
        in_x_0 = self.in_linear_0(x_0)
        in_x_1 = self.in_linear_1(x_1)
        in_x_2 = self.in_linear_2(x_2)
        x_all = (in_x_0,in_x_1,in_x_2)
        for layer in self.layers:
            x_all = layer(x_all,laplacian_all,incidence_all)
        
        """
        We pass the output on the nodes, edges and triangles to a pooling layer then a linear layer and use that for labels
        """
        x_0, x_1, x_2 = x_all 

        pooled_x_0 = torch.max(x_0,dim=0)[0]    
        pooled_x_1 = torch.max(x_1,dim=0)[0]
        pooled_x_2 = torch.max(x_2,dim=0)[0]    
        y_0 = torch.sigmoid(self.out_linear_0(pooled_x_0))[0]
        y_1 = torch.sigmoid(self.out_linear_1(pooled_x_1))[0]
        y_2 = torch.sigmoid(self.out_linear_2(pooled_x_2))[0]
        return y_0, y_1, y_2  

# Train the Neural Network

We specify the model with our pre-made neighborhood structures and specify an optimizer.

In [23]:

conv_order = 3
intermediate_channels_all = (4,4,4)
out_channels_all = intermediate_channels_all
num_layers = 2

model = SCCNN(in_channels_all=in_channels_all,intermediate_channels_all=intermediate_channels_all,out_channels_all=out_channels_all,conv_order=conv_order,sc_order=max_rank,n_layers=num_layers)

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = torch.nn.MSELoss()

In [24]:
test_size = 0.2
x_0_train, x_0_test = train_test_split(x_0s, test_size=test_size, shuffle=False)
x_1_train, x_1_test = train_test_split(x_1s, test_size=test_size, shuffle=False)
x_2_train, x_2_test = train_test_split(x_2s, test_size=test_size, shuffle=False)

incidence_1_train, incidence_1_test = train_test_split(
    incidence_1_list, test_size=test_size, shuffle=False
)
incidence_2_train, incidence_2_test = train_test_split(incidence_2_list, test_size=test_size, shuffle=False)
laplacian_0_train, laplacian_0_test = train_test_split(laplacian_0_list, test_size=test_size, shuffle=False)
laplacian_down_1_train, laplacian_down_1_test = train_test_split(laplacian_down_1_list, test_size=test_size, shuffle=False)
laplacian_up_1_train, laplacian_up_1_test = train_test_split(laplacian_up_1_list, test_size=test_size, shuffle=False)
laplacian_2_train, laplacian_2_test = train_test_split(laplacian_2_list, test_size=test_size, shuffle=False)

y_train, y_test = train_test_split(ys, test_size=test_size, shuffle=False)

In [25]:
test_interval = 1
num_epochs = 5

# select which feature to use for labeling
simplex_order_select = 1

for epoch_i in range(1, num_epochs + 1):
    epoch_loss = []
    model.train()
    for x_0, x_1, x_2, incidence_1, incidence_2, laplacian_0, laplacian_down_1, laplacian_up_1, laplacian_2, y in zip(x_0_train, x_1_train, x_2_train, incidence_1_train, incidence_2_train, laplacian_0_train, laplacian_down_1_train, laplacian_up_1_train, laplacian_2_train, y_train):
        
        x_0 = torch.tensor(x_0)
        x_1 = torch.tensor(x_1)
        x_2 = torch.tensor(x_2)
        y = torch.tensor(y,dtype=torch.float)
        optimizer.zero_grad()
        x_all = (x_0.float(),x_1.float(),x_2.float())
        laplacian_all = (laplacian_0, laplacian_down_1, laplacian_up_1, laplacian_2)
        incidence_all = (incidence_1, incidence_2)

        y_hat = model(x_all, laplacian_all, incidence_all)

        # print(y_hat.shape)
        loss = loss_fn(y_hat[simplex_order_select], y)

        epoch_loss.append(loss.item())
        loss.backward()
        optimizer.step()    

    print(
        f"Epoch: {epoch_i} loss: {np.mean(epoch_loss):.4f}",
        flush=True,
    )
    if epoch_i % test_interval == 0:
        with torch.no_grad():
            for x_0, x_1, x_2, incidence_1, incidence_2, laplacian_0, laplacian_down_1, laplacian_up_1, laplacian_2, y in zip(x_0_test, x_1_test, x_2_test, incidence_1_test, incidence_2_test, laplacian_0_test, laplacian_down_1_test, laplacian_up_1_test, laplacian_2_test, y_test):
    
                x_0 = torch.tensor(x_0)
                x_1 = torch.tensor(x_1)
                x_2 = torch.tensor(x_2)
                y = torch.tensor(y,dtype=torch.float)
                optimizer.zero_grad()
                x_all = (x_0.float(),x_1.float(),x_2.float())
                laplacian_all = (laplacian_0, laplacian_down_1, laplacian_up_1, laplacian_2)
                incidence_all = (incidence_1, incidence_2)

                y_hat = model(x_all, laplacian_all, incidence_all)

                    
                loss = loss_fn(y_hat[simplex_order_select], y)
            print(f"Test_loss: {loss:.4f}", flush=True)

Epoch: 1 loss: 277.6436
Test_loss: 530.5171
Epoch: 2 loss: 275.1914
Test_loss: 529.6067
Epoch: 3 loss: 274.8917
Test_loss: 529.3450
Epoch: 4 loss: 274.7841
Test_loss: 529.2281
Epoch: 5 loss: 274.7310
Test_loss: 529.1642
