## Initialize the Graph, ImpactCalculator and GNNAnalyzer


In [1]:
from lib.graph.graph.graph import Graph
from lib.gnnanalyzer.gnnanalyzer.localgnnanalyzer import LocalGNNAnalyzer
from lib.gnnanalyzer.gnnanalyzer.localimpactcalculator import (
    LocalImpactCalculator,
    LocalImpactCalculationMethod,
)
from typing import List, Tuple, Dict, Set, Optional, Literal

In [2]:
NODE_IDX = 0  # Index for the identifier column in nodes
NODE_FEATURE_START_IDX = 1  # Start index for node features columns
NODE_FEATURE_END_IDX = 1433  # End index for node features columns
CLASS_IDX = 1434  # Index for the class column in nodes
SOURCE_IDX = 0  # Index for the source column in edges
TARGET_IDX = 1  # Index for the target column in edges

In [3]:
g = Graph()
g.import_edges_from_edge_list(
    data="./datasets/cora/cora.cites",
    source_target_col=(SOURCE_IDX, TARGET_IDX),
    override_data_file_extension=".txt",
    delimiter="	",
)
g.import_nodes_from_node_list(
    data="./datasets/cora/cora.content",
    node_identifier_col=NODE_IDX,
    features_cols=[i for i in range(NODE_FEATURE_START_IDX, NODE_FEATURE_END_IDX)],
    class_col=CLASS_IDX,
    override_data_file_extension=".txt",
    delimiter="	",
)

# To please PyLance
assert g.nodes is not None
assert g.edges is not None
assert g.nodes.columns is not None
assert g.edges.columns is not None

In [4]:
impact = LocalImpactCalculator(method=LocalImpactCalculationMethod.PROBABILITY_CHANGE)
analyzer = LocalGNNAnalyzer(graph=g, local_impact_calculator=impact)

## Initial testings


### Creation of a basic model


In [5]:
from lib.graph.graph.graph import ExportFileFormat
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import add_self_loops, degree
from torch_geometric.data import Data
import torch
import torch.nn.functional as F
from torch.nn import Linear
from sklearn.preprocessing import LabelEncoder
import polars as pl
import numpy as np

We define a very basic GNN


In [6]:
class GCNConv(MessagePassing):
    def __init__(self, in_channels, out_channels):
        super().__init__(aggr="add")  # "Add" aggregation.
        self.lin = Linear(in_channels, out_channels, bias=False)
        self.bias = torch.nn.Parameter(torch.Tensor(out_channels))

        self.reset_parameters()

    def reset_parameters(self):
        self.lin.reset_parameters()
        torch.nn.init.zeros_(self.bias)

    def forward(self, x, edge_index):
        # Step 1: Add self-loops to the adjacency matrix.
        edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))

        # Step 2: Linearly transform node feature matrix.
        x = self.lin(x)

        # Step 3: Compute normalization.
        row, col = edge_index
        deg = degree(col, x.size(0), dtype=x.dtype)
        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt[deg_inv_sqrt == float("inf")] = 0
        norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]

        # Step 4-5: Start propagating messages.
        return self.propagate(edge_index, x=x, norm=norm)

    def message(self, x_j, norm):
        # Step 4: Normalize node features.
        return norm.view(-1, 1) * x_j

    def update(self, aggr_out):
        # Step 6: Apply a final bias vector.
        return aggr_out + self.bias

We define the device to be used


In [7]:
device_str: Literal["cuda", "mps", "cpu"] = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available() and torch.backends.mps.is_built()
    else "cpu"
)
device = torch.device(device_str)

The Graph Class allows us to get back the dataset in a multitude of formats; here we are getting a Polars DataFrame.


In [8]:
# Let's see how is our datasets once exported

nodes_df = g.export_nodes_as_node_list(file_format=ExportFileFormat.POLARS_DF)
edges_df = g.export_edges_as_edge_list(file_format=ExportFileFormat.POLARS_DF)

assert isinstance(nodes_df, pl.DataFrame)
assert isinstance(edges_df, pl.DataFrame)

print(nodes_df)
print(edges_df)

shape: (2_708, 1_435)
┌─────────┬───────────┬───────────┬───────────┬───┬────────────┬───────────┬───────────┬───────────┐
│ Node    ┆ Feature 0 ┆ Feature 1 ┆ Feature 2 ┆ … ┆ Feature    ┆ Feature   ┆ column_14 ┆ Class     │
│ ---     ┆ ---       ┆ ---       ┆ ---       ┆   ┆ 1430       ┆ 1431      ┆ 34        ┆ ---       │
│ i64     ┆ i64       ┆ i64       ┆ i64       ┆   ┆ ---        ┆ ---       ┆ ---       ┆ str       │
│         ┆           ┆           ┆           ┆   ┆ i64        ┆ i64       ┆ i64       ┆           │
╞═════════╪═══════════╪═══════════╪═══════════╪═══╪════════════╪═══════════╪═══════════╪═══════════╡
│ 31336   ┆ 0         ┆ 0         ┆ 0         ┆ … ┆ 0          ┆ 0         ┆ 0         ┆ Neural_Ne │
│         ┆           ┆           ┆           ┆   ┆            ┆           ┆           ┆ tworks    │
│ 1061127 ┆ 0         ┆ 0         ┆ 0         ┆ … ┆ 0          ┆ 0         ┆ 0         ┆ Rule_Lear │
│         ┆           ┆           ┆           ┆   ┆            ┆     

We need to map the name of the columns in the dataframe to the indices we defined ealier


In [9]:
# idx to column name
nodes_idx_to_col_name = {idx: col_name for idx, col_name in enumerate(nodes_df.columns)}  # type: ignore

edges_idx_to_col_name = {idx: col_name for idx, col_name in enumerate(edges_df.columns)}  # type: ignore

We also need a function that converts the Graph Class to a PyTorch Geometric Data object, as this will repeatedly be used in the analysis process


In [10]:
def create_torch_geometric_data(
    graph: Graph,
    class_idx: int,
    node_col_idx: int,
    node_feature_start_idx: int,
    node_feature_end_idx: int,
    source_idx: int,
    target_idx: int,
) -> Data:
    # Export the nodes and edges to the desired format
    nodes_df = graph.export_nodes_as_node_list(file_format=ExportFileFormat.POLARS_DF)
    edges_df = graph.export_edges_as_edge_list(file_format=ExportFileFormat.POLARS_DF)

    assert isinstance(nodes_df, pl.DataFrame)
    assert isinstance(edges_df, pl.DataFrame)

    # From nodes idx to column name
    nodes_col_node_name: str = nodes_idx_to_col_name[node_col_idx]
    nodes_col_class_name: str = nodes_idx_to_col_name[class_idx]
    node_col_features_name: list[str] = [
        nodes_idx_to_col_name[i]
        for i in range(node_feature_start_idx, node_feature_end_idx)
    ]

    # From edges idx to column name
    edges_col_source_name: str = edges_idx_to_col_name[source_idx]
    edges_col_target_name: str = edges_idx_to_col_name[target_idx]

    # Convert categorical class labels to numeric
    class_labels = nodes_df.select(nodes_col_class_name).to_numpy()
    encoder = LabelEncoder()
    Y = encoder.fit_transform(class_labels)

    print(f"Class labels: {Y[:10]}")  # type: ignore

    # Process the nodes DataFrame
    x = torch.tensor(
        nodes_df.select(node_col_features_name).to_numpy(),
        dtype=torch.float,
    )
    assert not (x == 0).all(dim=1).any(), "Found nodes with all-zero feature vectors"
    y = torch.tensor(Y, dtype=torch.long)

    # Process the edges DataFrame
    edge_index = (
        torch.tensor(
            data=edges_df.select(
                [edges_col_source_name, edges_col_target_name]
            ).to_numpy(),
            dtype=torch.long,
        )
        .t()
        .contiguous()
    )

    data = Data(x=x, edge_index=edge_index, y=y)
    return data

In [11]:
def create_masks(
    num_nodes: int, train_percent: float, val_percent: float
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    indices = np.random.permutation(num_nodes)
    train_size = int(train_percent * num_nodes)
    val_size = int(val_percent * num_nodes)

    train_mask = torch.zeros(num_nodes, dtype=torch.bool)
    val_mask = torch.zeros(num_nodes, dtype=torch.bool)
    test_mask = torch.zeros(num_nodes, dtype=torch.bool)

    train_mask[indices[:train_size]] = True
    val_mask[indices[train_size : train_size + val_size]] = True
    test_mask[indices[train_size + val_size :]] = True

    return train_mask, val_mask, test_mask

### Training and testing the model


In [12]:
def train(model, data, optimizer, criterion):
    model.train()
    optimizer.zero_grad()  # Clear gradients.
    out = model(data.x, data.edge_index)  # Forward pass.
    loss = criterion(
        out[data.train_mask], data.y[data.train_mask]
    )  # Compute the loss solely based on the training nodes.
    loss.backward()  # Derive gradients.
    optimizer.step()  # Update parameters based on gradients.
    return loss.item()


def evaluate(model, data):
    model.eval()
    with torch.no_grad():  # Inference without gradient tracking.
        logits = model(data.x, data.edge_index)  # Forward pass.
        preds = logits.argmax(dim=1)  # Get the index of the max log-probability.

        # For training set.
        train_correct = preds[data.train_mask] == data.y[data.train_mask]
        train_acc = int(train_correct.sum()) / int(data.train_mask.sum())

        # For validation set.
        val_correct = preds[data.val_mask] == data.y[data.val_mask]
        val_acc = int(val_correct.sum()) / int(data.val_mask.sum())

        # For test set.
        test_correct = preds[data.test_mask] == data.y[data.test_mask]
        test_acc = int(test_correct.sum()) / int(data.test_mask.sum())

    return train_acc, val_acc, test_acc

In [13]:
# Convert the graph to a PyTorch Geometric Data object
torch_geo_data = create_torch_geometric_data(
    graph=g,
    class_idx=CLASS_IDX,
    node_col_idx=NODE_IDX,
    node_feature_start_idx=NODE_FEATURE_START_IDX,
    node_feature_end_idx=NODE_FEATURE_END_IDX,
    source_idx=SOURCE_IDX,
    target_idx=TARGET_IDX,
)
num_nodes: int | None = torch_geo_data.num_nodes
assert num_nodes is not None

train_mask, val_mask, test_mask = create_masks(
    num_nodes, train_percent=0.6, val_percent=0.2
)

# Add the masks to your torch_geo_data object
torch_geo_data.train_mask = train_mask
torch_geo_data.val_mask = val_mask
torch_geo_data.test_mask = test_mask

# Define some parameters for the GNN model
input_dim: int = torch_geo_data.num_node_features
hidden_dim: int = 64  # Example hidden dimension size
output_dim: int = torch_geo_data.y.max().item() + 1  # Number of classes

model = GCNConv(input_dim, output_dim).to(device=device)

torch_geo_data = torch_geo_data.to(device=device_str)
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()  # Criterion assumes model returns raw logits

# Training loop
for epoch in range(1, 200):
    loss = train(model, torch_geo_data, optimizer, criterion)
    print(f"Epoch: {epoch:03d}, Loss: {loss:.4f}")

# Evaluation on test set
train_acc, val_acc, test_acc = evaluate(model, torch_geo_data)
print(f"Test Accuracy: {test_acc:.4f}")

# Predictions for a specific node
model.eval()
with torch.no_grad():
    logits = model(torch_geo_data.x, torch_geo_data.edge_index)
    probs = logits.softmax(dim=1)
    predicted_class = (
        probs[0].argmax().item()
    )  # Get the predicted class for node at index 0

print(f"Predicted class for the node at index 0: {predicted_class}")

Class labels: [2 5 4 4 3 3 6 2 2 6]


  y = column_or_1d(y, warn=True)


Epoch: 001, Loss: 1.9470
Epoch: 002, Loss: 1.8176
Epoch: 003, Loss: 1.7003
Epoch: 004, Loss: 1.5942
Epoch: 005, Loss: 1.4980
Epoch: 006, Loss: 1.4099
Epoch: 007, Loss: 1.3282
Epoch: 008, Loss: 1.2518
Epoch: 009, Loss: 1.1800
Epoch: 010, Loss: 1.1124
Epoch: 011, Loss: 1.0490
Epoch: 012, Loss: 0.9898
Epoch: 013, Loss: 0.9349
Epoch: 014, Loss: 0.8841
Epoch: 015, Loss: 0.8374
Epoch: 016, Loss: 0.7944
Epoch: 017, Loss: 0.7550
Epoch: 018, Loss: 0.7189
Epoch: 019, Loss: 0.6856
Epoch: 020, Loss: 0.6549
Epoch: 021, Loss: 0.6265
Epoch: 022, Loss: 0.6002
Epoch: 023, Loss: 0.5757
Epoch: 024, Loss: 0.5528
Epoch: 025, Loss: 0.5315
Epoch: 026, Loss: 0.5115
Epoch: 027, Loss: 0.4929
Epoch: 028, Loss: 0.4755
Epoch: 029, Loss: 0.4592
Epoch: 030, Loss: 0.4440
Epoch: 031, Loss: 0.4299
Epoch: 032, Loss: 0.4167
Epoch: 033, Loss: 0.4044
Epoch: 034, Loss: 0.3929
Epoch: 035, Loss: 0.3822
Epoch: 036, Loss: 0.3722
Epoch: 037, Loss: 0.3628
Epoch: 038, Loss: 0.3540
Epoch: 039, Loss: 0.3457
Epoch: 040, Loss: 0.3380


In [14]:
# Get node with the most connected edges
node_with_most_edges = g.get_node_with_most_neighbors()
node_with_most_edges

190698

In [15]:
node_identifier_to_index = {
    str(identifier): index for index, identifier in enumerate(nodes_df["Node"])
}

# Starting node identifier.
starting_node_id = node_with_most_edges
starting_node_index = node_identifier_to_index[str(starting_node_id)]

In [16]:
def predict_fn(model, x, edge_index, node_index):
    model.eval()
    with torch.no_grad():
        out = model(x, edge_index)
        prediction = out.softmax(dim=1)[
            node_index
        ]  # Retrieve the prediction for the node of interest
    return prediction

In [18]:
# Integrate the trained model with the LocalGNNAnalyzer
analyzer.prepare_ablation_plan(starting_node=starting_node_id, max_depth=3)

print("Ablation plan:")
print(analyzer.ablation_plan)

while analyzer.has_next_step():
    current_prediction = predict_fn(
        model, torch_geo_data.x, torch_geo_data.edge_index, starting_node_index
    )
    print(f"Current prediction: {current_prediction}")
    analyzer.execute_ablation_step(prev_gnn_prediction=current_prediction.tolist())
    torch_geo_data = create_torch_geometric_data(
        graph=analyzer.graph,
        class_idx=CLASS_IDX,
        node_col_idx=NODE_IDX,
        node_feature_start_idx=NODE_FEATURE_START_IDX,
        node_feature_end_idx=NODE_FEATURE_END_IDX,
        source_idx=SOURCE_IDX,
        target_idx=TARGET_IDX,
    )

# Get the interpretation of the GNN's predictions
interpretation = analyzer.get_interpretation()
print(interpretation)

Ablation plan:
[[[190698, 190697]]]
[]
[]
