In [1]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import RGCNConv, global_mean_pool
from torch_geometric.data import Data
#from graph_builder import GraphBuilder  # <-- External builder
import pandas as pd
from torch.nn import Linear, ReLU, Sequential
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
from torch_geometric.nn import GCNConv, global_add_pool
from sklearn.model_selection import train_test_split
import ast
from torch_geometric.utils import degree


First we read the edges and coefficients of the csv files and save them in lists.

Here we read the files for 5 to 8 loops.

We aim to use 9 and 10 loop data for testing.

In [2]:
edges=[]
y=[]
for i in range(5, 9):
    filename = f'../Graph_Edge_Data/den_graph_data_{i}.csv'
    df = pd.read_csv(filename)
    edges += df['EDGES'].tolist()
    y += df['COEFFICIENTS'].tolist()

In [3]:
edges = [ast.literal_eval(e) for e in edges]

In [4]:
from torch_geometric.transforms import AddLaplacianEigenvectorPE
eigen_vec= AddLaplacianEigenvectorPE(k=3,attr_name=None)

We need to now translate the edges into dataset forms for training and testing.

In [34]:
class GraphBuilder:
    def __init__(self, solid_edges, coeff, node_labels=None):
        # Auto-infer node labels if not provided
        if node_labels is None:
            node_labels = sorted(set(u for e in solid_edges for u in e))
        self.node_labels = node_labels
        self.label2idx = {label: i for i, label in enumerate(node_labels)}

        self.solid_edges = solid_edges
        self.num_nodes = len(self.node_labels)
        self.y = torch.tensor(coeff, dtype=torch.long)  # Ensure y is a column vector

    def build(self, extra_node_features=None):
        edge_list = []

        for u, v in self.solid_edges:
            i, j = self.label2idx[u], self.label2idx[v]
            edge_list += [[i, j], [j, i]]  # bidirectional

        edge_index = torch.tensor(edge_list, dtype=torch.long).t().contiguous()

        # Basic node feature: degree
        degree_feat = degree(edge_index[0], num_nodes=self.num_nodes).view(-1, 1)

        # Combine degree with extra features if provided
        if extra_node_features is not None:
            assert extra_node_features.shape[0] == self.num_nodes, \
                "extra_node_features must match number of nodes"
            x = torch.cat([degree_feat, extra_node_features], dim=1)
        else:
            x = degree_feat
        return Data(x=x, edge_index=edge_index, num_nodes=self.num_nodes, coeff=self.y)


In [35]:
data=[GraphBuilder(solid_edges=x,coeff=y0).build() for x,y0 in zip(edges,y)]
data = [eigen_vec(d) for d in data]

In [36]:
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

In [37]:
# Combine graph_list and y into a DataLoader where graph_list represents x and y represents y with train/test split
train_loader = DataLoader(train_data, batch_size=5, shuffle=True)
test_loader = DataLoader(test_data, batch_size=5, shuffle=False)

We are interested in graph classification of 0 and 1. We add two graph convolutional layers, making sure that the message passing is extended to two neighbours, and then add graph pooling to average over the whole graph.

In [38]:
class SimpleGNN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels):
        super().__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.lin = torch.nn.Linear(hidden_channels, 2)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        batch = torch.zeros(data.num_nodes, dtype=torch.long)  # single graph
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = global_mean_pool(x, batch)
        return self.lin(x)

In [39]:
# Create the model object using CNN class
model = SimpleGNN(in_channels=4, hidden_channels=20)

In [40]:
# import cross entropy loss
import torch.nn as nn
criterion = nn.CrossEntropyLoss()
learning_rate = 0.1
optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)

In [48]:
def train_model(model,train_loader,test_loader,optimizer,n_epochs=4):
    
    #global variable 
    N_test=len(test_loader)
    accuracy_list=[]
    loss_list=[]
    for epoch in range(n_epochs):
        for x_train, y_train in train_loader:
            model.train()
            optimizer.zero_grad()
            z = model(x_train)
            loss = criterion(z, y_train)
            loss.backward()
            optimizer.step()
            loss_list.append(loss.data)

        correct=0
        #perform a prediction on the validation  data  
        for x_test, y_test in test_loader:
            model.eval()
            z = model(x_test)
            _, yhat = torch.max(z.data, 1)
            correct += (yhat == y_test).sum().item()
        accuracy = correct / N_test
        accuracy_list.append(accuracy)
     
    return accuracy_list, loss_list

In [50]:
train_model(model,train_loader,test_loader,optimizer,n_epochs=4)

ValueError: too many values to unpack (expected 2)