In [1]:
import os, torch
from sklearn.model_selection import train_test_split
import pickle
import torch_geometric.transforms as T
import numpy as np
from torch_geometric.nn.models import Node2Vec
from torch_geometric.data import DataLoader
from torch_geometric.nn import MessagePassing
from torch_geometric.data import Data
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GATConv
GCNConv._orig_propagate = GCNConv.propagate

import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from torch_geometric.explain import GNNExplainer, Explainer


epochs = int(os.getenv("EPOCHS", 10))  # Default to 10 if not provided
learning_rate = float(os.getenv("LEARNING_RATE", 0.001))  # Default to 0.001
hidden_c = int(os.getenv("HIDDEN_C", 16))  # Default to 16
random_seed = int(os.getenv("RANDOM_SEED", 42))  # Default to 42
bins = [int(i) for i in os.getenv("BINS", "3000").split(' ')]  # Default to [1000, 3000, 5000]
num_layers = int(os.getenv("NUM_LAYERS", 5))  # Default to 5
nh = int(os.getenv("NUM_HEADS", 10))
gat = int(os.getenv("GAT", 0))
api_key = os.getenv("API_KEY", None)
graph_num = os.getenv("GRAPH_NUM", 2)
dropout_p = float(os.getenv("DROPOUT", 0.5))

graph_num = 20

model_name = 'sweet-elevator-279'  # Replace with your model name
weight_prefix = 'best_accuracy'  # Replace with your weight prefix

if torch.cuda.is_available():
    device = torch.device('cuda')
    print(f"Using CUDA device: {torch.cuda.get_device_name(0)}", flush = True)
else:
    device = torch.device('cpu')
    print("Using CPU", flush = True)

with open(f'../data/graphs/{graph_num}/linegraph_tg.pkl', 'rb') as f:
    data = pickle.load(f)

data.edge_index = data.edge_index.contiguous()
data.x = data.x.contiguous()
data.y = data.y.contiguous()

# Define or import the GCN class
class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super().__init__()
        torch.manual_seed(random_seed)
        self.conv1 = GCNConv(data.num_features, hidden_channels, improved = True, cached = True)
        conv2_list = []
        hc = hidden_channels
        # for _ in range(num_layers):
        #     conv2_list.append(
        #         GCNConv(hc, hc)
        #     )
            # hc //= 2
        # self.conv2 = torch.nn.ModuleList(conv2_list)
        self.conv3 = GCNConv(hc, len(bins) + 1, cached = True)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=dropout_p, training=self.training)
        # for conv in self.conv2:
        #     x = conv(x, edge_index)
        #     x = F.relu(x)
        #     x = F.dropout(x, p=dropout_p, training=self.training)
        x = self.conv3(x, edge_index)
        return x


# Load the model with the GCN class
model = torch.load(f'../data/graphs/{graph_num}/models/{model_name}.pt', map_location=device)
model = model.to(device)

model.load_state_dict(torch.load(f'../data/graphs/{graph_num}/models/{model_name}_{weight_prefix}.pt', map_location=device))


Using CPU


<All keys matched successfully>

In [2]:
from torch_geometric.explain import GNNExplainer, Explainer

explainer = Explainer(
    model=model,
    algorithm=GNNExplainer(epochs=1),
    explanation_type='model',
    node_mask_type='attributes',
    edge_mask_type='object',
    model_config=dict(
        mode='multiclass_classification',
        task_level='node',
        return_type='log_probs',
    ),
)


In [3]:
# !pip install --upgrade torch-scatter -f https://data.pyg.org/whl/torch-2.1.0+cu117.html
# !pip install --upgrade torch-sparse -f https://data.pyg.org/whl/torch-2.1.0+cu117.html
# !pip install --upgrade torch-geometric


In [None]:
node_idx = 42  # or whatever node you want to explain

# Input data must include x and edge_index, and optionally y
explanation = explainer(data.x.to(device), data.edge_index.to(device), index=node_idx)


In [None]:
explanation


Explanation(node_mask=[78168, 695], edge_mask=[152596], prediction=[78168, 2], target=[78168], index=[1], x=[78168, 695], edge_index=[2, 152596])

In [None]:
path = 'feature_importance.png'
explanation.visualize_feature_importance(path, top_k=10)
print(f"Feature importance plot has been saved to '{path}'")

path = 'subgraph.pdf'
explanation.visualize_graph(path)
print(f"Subgraph visualization plot has been saved to '{path}'")


TypeError: Cannot convert numpy.ndarray to numpy.ndarray

In [12]:
from torch_geometric.explain.explanation import _visualize_score

# Ensure node_mask is 2D
node_mask = explanation.node_mask
if node_mask.dim() == 1:
    node_mask = node_mask.unsqueeze(0)
elif node_mask.dim() == 3:
    node_mask = node_mask.squeeze(0)

# Sum across nodes (or use first if only one node)
score = node_mask.sum(dim=0).detach().cpu().numpy()
score = score.flatten()  # ensure 1D

# Ensure labels are native Python list (not np array or tensor)
feat_labels = [f"feat_{i}" for i in range(score.shape[0])]

# One last sanity check
assert len(feat_labels) == score.shape[0], f"Mismatch: labels={len(feat_labels)} vs score={score.shape}"

# Now safe to call
_visualize_score(torch.tensor(score), feat_labels, path="feature_importance.png", top_k=10)
print("✅ Feature importance saved to 'feature_importance.png'")


TypeError: Cannot convert numpy.ndarray to numpy.ndarray