# Train a Cellular Attention Network

We create and test our implementation of HodgeNet on a toy cellular complex to see if it reads the geometry 

We create and train a simplified version of the CCXN originally proposed in [Hajij et. al : Cell Complex Neural Networks (2020)](https://arxiv.org/pdf/2010.00743.pdf).

### The Neural Network:

The equations of one layer of this neural network are given by:

1. A convolution from nodes to nodes using an adjacency message passing scheme (AMPS):

🟥 $\quad m_{y \rightarrow \{z\} \rightarrow x}^{(0 \rightarrow 1 \rightarrow 0)} = M_{\mathcal{L}_\uparrow}^t(h_x^{t,(0)}, h_y^{t,(0)}, \Theta^{t,(y \rightarrow x)})$ 

🟧 $\quad m_x^{(0 \rightarrow 1 \rightarrow 0)} = AGG_{y \in \mathcal{L}_\uparrow(x)}(m_{y \rightarrow \{z\} \rightarrow x}^{0 \rightarrow 1 \rightarrow 0})$ 

🟩 $\quad m_x^{(0)} = m_x^{(0 \rightarrow 1 \rightarrow 0)}$ 

🟦 $\quad h_x^{t+1,(0)} = U^{t}(h_x^{t,(0)}, m_x^{(0)})$

2. A convolution from edges to faces using a cohomology message passing scheme:

🟥 $\quad m_{y \rightarrow x}^{(r' \rightarrow r)} = M^t_{\mathcal{C}}(h_{x}^{t,(r)}, h_y^{t,(r')}, x, y)$ 

🟧 $\quad m_x^{(r' \rightarrow r)}  = AGG_{y \in \mathcal{C}(x)} m_{y \rightarrow x}^{(r' \rightarrow r)}$ 

🟩 $\quad m_x^{(r)} = m_x^{(r' \rightarrow r)}$ 

🟦 $\quad h_{x}^{t+1,(r)} = U^{t,(r)}(h_{x}^{t,(r)}, m_{x}^{(r)})$

Where the notations are defined in [Papillon et al : Architectures of Topological Deep Learning: A Survey of Topological Neural Networks (2023)](https://arxiv.org/abs/2304.10031).

### The Task:

We train this model to perform entire complex classification on [`MUTAG` from the TUDataset](https://paperswithcode.com/dataset/mutag). This dataset contains:
- 188 samples of chemical compounds represented as graphs,
- with 7 discrete node features.

The task is to predict the mutagenicity of each compound on Salmonella typhimurium.

# Set-up


In [29]:
import random

import numpy as np
import torch
import toponetx as tnx
from toponetx import CellComplex as Cc

from sklearn.model_selection import train_test_split
from torch_geometric.datasets import TUDataset
from torch_geometric.utils.convert import to_networkx

from topomodelx.nn.cell.can_layer import CANLayer

If GPU's are available, we will make use of them. Otherwise, this will run on CPU.

In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


# Pre-processing

## Creating a Toy Cellular Complex ##

We create a toy cellular complex of rank 2 to test our implementation of HodgeNet on

In [32]:
toy = Cc()
toy.add_cells_from([[1, 2, 3], [2, 4, 5, 3]], rank=2)
print(toy)

Cell Complex with 5 nodes, 6 edges  and 2 2-cells 


# Pre-processing

## Import data ##

We import a subset of MUTAG, a benchmark dataset for graph classification. 

We then lift each graph into our topological domain of choice, here: a cell complex.

We also retrieve:
- input signals `x_0` and `x_1` on the nodes (0-cells) and edges (1-cells) for each complex: these will be the model's inputs,
- a binary classification label `y` associated to the cell complex.

In [9]:
dataset = TUDataset(
    root="/tmp/MUTAG", name="MUTAG", use_edge_attr=True, use_node_attr=True
)
dataset = dataset[:100]
cc_list = []
x_0_list = []
x_1_list = []
y_list = []
for graph in dataset:
    cell_complex = CellComplex(to_networkx(graph))
    cc_list.append(cell_complex)
    x_0_list.append(graph.x)
    x_1_list.append(graph.edge_attr)
    y_list.append(int(graph.y))

i_cc = 0
print(f"Features on nodes for the {i_cc}th cell complex: {x_0_list[i_cc].shape}.")
print(f"Features on edges for the {i_cc}th cell complex: {x_1_list[i_cc].shape}.")
print(f"Label of {i_cc}th cell complex: {y_list[i_cc]}.")

cc_list[0].skeleton(rank=2)

Features on nodes for the 0th cell complex: torch.Size([17, 7]).
Features on edges for the 0th cell complex: torch.Size([38, 4]).
Label of 0th cell complex: 1.


CellView([])

## Define neighborhood structures. ##

Implementing the CCXN architecture will require to perform message passing along neighborhood structures of the cell complexes.

Thus, now we retrieve these neighborhood structures (i.e. their representative matrices) that we will use to send messages. 

For the CCXN, we need the adjacency matrix $A_{\uparrow, 0}$ and the coboundary matrix $B_2^T$ of each cell complex.

In [4]:
up_laplacian_list = []
down_laplacian_list = []
for cell_complex in cc_list:
    up_laplacian = cell_complex.up_laplacian_matrix(rank=1)
    down_laplacian = cell_complex.down_laplacian_matrix(rank=1)
    up_laplacian = torch.from_numpy(up_laplacian.todense()).to_sparse()
    down_laplacian = torch.from_numpy(down_laplacian.todense()).to_sparse()
    up_laplacian_list.append(up_laplacian)
    down_laplacian_list.append(down_laplacian)

i_cc = 0
print(f"Up Laplacian of the {i_cc}-th complex: {up_laplacian_list[i_cc].shape}.")
print(f"Down Laplacian of the {i_cc}-th complex: {down_laplacian_list[i_cc].shape}.")

ValueError: Rank should larger than 0 and <= 0 (maximal dimension cells-1), got 1.

# Create the Neural Network

Using the CCXNLayer class, we create a neural network with stacked layers.

In [6]:
in_channels_0 = x_0_list[0].shape[-1]
in_channels_1 = x_1_list[0].shape[-1]
in_channels_2 = 5
print(
    f"The dimension of input features on nodes, edges and faces are: {in_channels_0}, {in_channels_1} and {in_channels_2}."
)

The dimension of input features on nodes, edges and faces are: 7, 4 and 5.


In [7]:
class CAN(torch.nn.Module):
    """CCXN.

    Parameters
    ----------
    in_channels_0 : int
        Dimension of input features on nodes.
    in_channels_1 : int
        Dimension of input features on edges.
    in_channels_2 : int
        Dimension of input features on faces.
    num_classes : int
        Number of classes.
    n_layers : int
        Number of CCXN layers.
    att : bool
        Whether to use attention.
    """

    def __init__(
        self,
        in_channels_0,
        in_channels_1,
        num_classes,
        n_layers=2,
    ):
        super().__init__()
        layers = []
        for _ in range(n_layers):
            layers.append(CANLayer(channels=in_channels_1))
        self.layers = layers
        self.lin_0 = torch.nn.Linear(in_channels_0, num_classes)
        self.lin_1 = torch.nn.Linear(in_channels_1, num_classes)

    def forward(self, x_0, x_1, down_laplacian, up_laplacian):
        """Forward computation through layers, then linear layers, then avg pooling.

        Parameters
        ----------
        x_0 : torch.Tensor, shape = [n_nodes, in_channels_0]
            Input features on the nodes (0-cells).
        x_1 : torch.Tensor, shape = [n_edges, in_channels_1]
            Input features on the edges (1-cells).
        neighborhood_0_to_0 : tensor, shape = [n_nodes, n_nodes]
            Adjacency matrix of rank 0 (up).
        neighborhood_1_to_2 : tensor, shape = [n_faces, n_edges]
            Transpose of boundary matrix of rank 2.
        x_2 : torch.Tensor, shape = [n_faces, in_channels_2]
            Input features on the faces (2-cells).
            Optional. Use for attention mechanism between edges and faces.

        Returns
        -------
        _ : tensor, shape = [1]
            Label assigned to whole complex.
        """
        for layer in self.layers:
            x_1 = layer(x_1, down_laplacian, up_laplacian)
        x_0 = self.lin_0(x_0)
        x_1 = self.lin_1(x_1)
        return torch.mean(x_1, dim=0) + torch.mean(x_0, dim=0)

# Train the Neural Network

We specify the model, initialize loss, and specify an optimizer. We first try it without any attention mechanism.

In [8]:
model = HodgeNet(in_channels_0, in_channels_1, num_classes=2, n_layers=2)
model = model.to(device)
crit = torch.nn.CrossEntropyLoss()
opt = torch.optim.Adam(model.parameters(), lr=0.1)

We split the dataset into train and test sets.

In [9]:
test_size = 0.2
x_0_train, x_0_test = train_test_split(x_0_list, test_size=test_size, shuffle=False)
x_1_train, x_1_test = train_test_split(x_1_list, test_size=test_size, shuffle=False)
up_laplacian_train, up_laplacian_test = train_test_split(
    up_laplacian_list, test_size=test_size, shuffle=False
)
down_laplacian_train, down_laplacian_test = train_test_split(
    down_laplacian_list, test_size=test_size, shuffle=False
)
y_train, y_test = train_test_split(y_list, test_size=test_size, shuffle=False)

We train the CCXN using low amount of epochs: we keep training minimal for the purpose of rapid testing.

In [10]:
test_interval = 2
for epoch_i in range(1, 5):
    epoch_loss = []
    num_samples = 0
    correct = 0
    model.train()
    for x_0, x_1, up_laplacian, down_laplacian, y in zip(
        x_0_train, x_1_train, up_laplacian_train, down_laplacian_train, y_train
    ):
        opt.zero_grad()

        y_hat = model(
            x_0.float(), x_1.float(), down_laplacian.float(), up_laplacian.float()
        )
        y = torch.tensor(y).long()
        loss = crit(y_hat, y)
        correct += (y_hat.argmax() == y).sum().item()
        num_samples += 1
        loss.backward()
        opt.step()
        epoch_loss.append(loss.item())
    train_acc = correct / num_samples
    print(
        f"Epoch: {epoch_i} loss: {np.mean(epoch_loss):.4f} Train_acc: {train_acc:.4f}",
        flush=True,
    )
    if epoch_i % test_interval == 0:
        with torch.no_grad():
            num_samples = 0
            correct = 0
            for x_0, x_1, down_laplacian, up_laplacian, y in zip(
                x_0_test, x_1_test, down_laplacian_test, up_laplacian_test, y_test
            ):
                y = torch.tensor(y).long()
                y_hat = model(
                    x_0.float(),
                    x_1.float(),
                    down_laplacian.float(),
                    up_laplacian.float(),
                )

                correct += (y_hat.argmax() == y).sum().item()
                num_samples += 1
            test_acc = correct / num_samples
            print(f"Test_acc: {test_acc:.4f}", flush=True)

Epoch: 1 loss: nan Train_acc: 0.3750
Epoch: 2 loss: nan Train_acc: 0.3750
Test_acc: 0.5000
Epoch: 3 loss: nan Train_acc: 0.3750
Epoch: 4 loss: nan Train_acc: 0.3750
Test_acc: 0.5000


# Train the Neural Network with Attention


Now we create a new neural network, that uses the attention mechanism.

In [11]:
model = CCXN(
    in_channels_0, in_channels_1, in_channels_2, num_classes=2, n_layers=2, att=True
)
model = model.to(device)
crit = torch.nn.CrossEntropyLoss()
opt = torch.optim.Adam(model.parameters(), lr=0.1)

We run the training for this neural network:

In [12]:
test_interval = 2
for epoch_i in range(1, 5):
    epoch_loss = []
    num_samples = 0
    correct = 0
    model.train()
    for x_0, x_1, incidence_2_t, adjacency_0, y in zip(
        x_0_train, x_1_train, incidence_2_t_train, adjacency_0_train, y_train
    ):
        opt.zero_grad()

        y_hat = model(
            x_0.float(), x_1.float(), adjacency_0.float(), incidence_2_t.float()
        )
        y = torch.tensor(y).long()
        loss = crit(y_hat, y)
        correct += (y_hat.argmax() == y).sum().item()
        num_samples += 1
        loss.backward()
        opt.step()
        epoch_loss.append(loss.item())
    train_acc = correct / num_samples
    print(
        f"Epoch: {epoch_i} loss: {np.mean(epoch_loss):.4f} Train_acc: {train_acc:.4f}",
        flush=True,
    )
    if epoch_i % test_interval == 0:
        with torch.no_grad():
            num_samples = 0
            correct = 0
            for x_0, x_1, incidence_2_t, adjacency_0, y in zip(
                x_0_test, x_1_test, incidence_2_t_test, adjacency_0_test, y_test
            ):
                y = torch.tensor(y).long()
                y_hat = model(
                    x_0.float(), x_1.float(), adjacency_0.float(), incidence_2_t.float()
                )

                correct += (y_hat.argmax() == y).sum().item()
                num_samples += 1
            test_acc = correct / num_samples
            print(f"Test_acc: {test_acc:.4f}", flush=True)

Epoch: 1 loss: nan Train_acc: 0.3750
Epoch: 2 loss: nan Train_acc: 0.3750
Test_acc: 0.5000
Epoch: 3 loss: nan Train_acc: 0.3750
Epoch: 4 loss: nan Train_acc: 0.3750
Test_acc: 0.5000
