In [None]:
import numpy as np
import pandas as pd
import scipy.sparse as sp
import torch
import os
from torch.utils import data
from torch.autograd import Variable

def normalize_features(mx):
    """Row-normalize sparse matrix"""
    rowsum = np.array(mx.sum(1))
    r_inv = np.power(rowsum, -1).flatten()
    r_inv[np.isinf(r_inv)] = 0.
    r_mat_inv = sp.diags(r_inv)
    mx = r_mat_inv.dot(mx)
    return mx

def normalize_adj(mx):
    """Row-normalize sparse matrix"""
    rowsum = np.array(mx.sum(1))
    r_inv_sqrt = np.power(rowsum, -0.5).flatten()
    r_inv_sqrt[np.isinf(r_inv_sqrt)] = 0.
    r_mat_inv_sqrt = sp.diags(r_inv_sqrt)
    return mx.dot(r_mat_inv_sqrt).transpose().dot(r_mat_inv_sqrt)

def accuracy(output, labels):
    probs = torch.exp(output)
    preds = torch.argmax(probs, dim = 1)
    correct = preds.eq(labels).double()
    correct = correct.sum()
    return correct / len(labels)



def load_multi_data(folder, att_file, edge_list_name):
    att = pd.read_csv(folder + att_file)
    edge_list = []
    for name in edge_list_name:
        edge_list.append(pd.read_csv(folder + name))

    #get y and x
    labels = np.array(att["# Insert outcome variable here"])
    features = sp.csr_matrix(att[["# Insert feature variables here"]])
    #features = normalize_features(features)

    #get adj mat
    adj_list = []
    for edge in edge_list:
        #get row col idx for adj matrix
        row_idx = []
        col_idx = []
        for i in range(edge.shape[0]):
            id_from = edge.iloc[i,0]
            id_to = edge.iloc[i,1]
            # uid as unique identifier
            if sum(att["uid"] == id_from)>0:
              row_id = att.index[att["uid"] == id_from]
              row_idx.append(row_id[0])

            if sum(att["uid"] == id_to)>0:
              col_id = att.index[att["uid"] == id_to]
              col_idx.append(col_id[0])


        adj = sp.coo_matrix((np.ones(edge.shape[0]), (row_idx, col_idx)), shape=(att.shape[0], att.shape[0]), dtype=np.float32)



        #make adj symmetric
        adj = adj + adj.T.multiply(adj.T > adj) - adj.multiply(adj.T > adj)
        #normaliza adj
        adj = normalize_adj(adj + sp.eye(adj.shape[0]))
        adj = torch.FloatTensor(np.array(adj.todense()))
        adj_list.append(adj)

    features = torch.FloatTensor(np.array(features.todense()))
    labels = torch.LongTensor(labels)

    dim = len(labels)
    idx_train = range(1915)
    idx_test = range(1915, dim)

    features, adj_list, labels = Variable(features), torch.stack(adj_list), Variable(labels)


    data = MyDataset(adj_list, features, labels, idx_train, idx_test)
    return data

class MyDataset(data.Dataset):
    def __init__(self, adj_ls, node_features, labels, idx_train, idx_test):
        self.adj_ls = adj_ls
        self.features = node_features
        self.train_mask = idx_train
        self.test_mask = idx_test
        self.labels = labels

    def __getitem__(self, index):
        node_features = self.features[index]
        labels = self.labels[index]
        return self.adj_ls, node_features, labels

    def __len__(self):
        return len(self.features)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F



class GAT(nn.Module):
    def __init__(self, nfeat, nhid, nclass, dropout, alpha, nheads):
        """Dense version of GAT."""
        super(GAT, self).__init__()
        self.dropout = dropout

        self.attentions = [GraphAttentionLayer(nfeat, nhid, dropout=dropout, alpha=alpha, concat=True) for _ in range(nheads)]
        for i, attention in enumerate(self.attentions):
            self.add_module('attention_{}'.format(i), attention)

        self.out_att = nn.Linear(nhid * nheads, nclass)

    def forward(self, x, adj):
        x = F.dropout(x, self.dropout, training=self.training)
        x = torch.cat([att(x, adj) for att in self.attentions], dim=1)
        x = F.dropout(x, self.dropout, training=self.training)
        x = F.elu(self.out_att(x))
        return F.log_softmax(x, dim=1)



class FusionGAT(nn.Module):
    def __init__(self, nfeat, nhid, nclass, dropout, alpha, adj_list, nheads):
        super(FusionGAT, self).__init__()

        self.dropout = dropout
        self.nheads = nheads
        self.adj_list = adj_list

        # Define list of GAT layers for each adjacency matrix
        self.attentions = nn.ModuleList()
        for i in range(len(adj_list)):
            att_list = nn.ModuleList([GraphAttentionLayer(nfeat, nhid, dropout=dropout, alpha=alpha, concat=True) for _ in range(nheads)])
            self.attentions.append(att_list)
            for k, attention in enumerate(self.attentions):
                self.add_module('adj{}, attention_{}'.format(i, k), attention)

        # Define linear layer for integration with L1 regularization
        self.integration_att = nn.Linear(nhid * nheads, nclass)
        self.fusion = nn.Linear(nclass * len(adj_list), nclass)
        self.l1_reg = nn.L1Loss(reduction='mean')

    def forward(self, x, adj_list):
        x = F.dropout(x, self.dropout, training=self.training)
        # Compute output for each adjacency matrix using GAT layers
        output_list = []
        for i, adj in enumerate(adj_list):
            x_i = torch.cat([att(x, adj) for att in self.attentions[i]], dim=1)
            x_i = F.dropout(x_i, self.dropout, training=self.training)
            x_i = F.elu(self.integration_att(x_i))
            output_list.append(x_i)
        output = torch.cat(output_list, dim=1)

        # Apply linear layer for integration with L1 regularization
        output = F.dropout(output, self.dropout, training=self.training)
        output = self.fusion(output.view(output.size(0), -1))
        l1_loss = self.l1_reg(self.fusion.weight, torch.zeros_like(self.fusion.weight))
        return F.log_softmax(output, dim=1), l1_loss



class FusionGAT2(nn.Module):
    def __init__(self, nfeat, nhid1, nhid2, nclass, dropout, alpha, adj_list, nheads):
        super(FusionGAT2, self).__init__()

        self.dropout = dropout
        self.nheads = nheads
        self.adj_list = adj_list

        # Define list of GAT layers for each adjacency matrix in attention 1
        self.attentions1 = nn.ModuleList()
        for i in range(len(adj_list)):
            att_list = nn.ModuleList([GraphAttentionLayer(nfeat, nhid1, dropout=dropout, alpha=alpha, concat=True) for _ in range(nheads)])
            self.attentions1.append(att_list)
            for k, attention in enumerate(self.attentions1):
                self.add_module('adj{}, attention_layer1_{}'.format(i, k), attention)

        # Define linear layer for integration of multihead attention1
        self.integration_att1 = nn.Linear(nhid1 * nheads, nhid1)

        self.attentions2 = nn.ModuleList()
        for i in range(len(adj_list)):
            att_list2 = nn.ModuleList([GraphAttentionLayer(nhid1, nhid2, dropout=dropout, alpha=alpha, concat=True) for _ in range(nheads)])
            self.attentions2.append(att_list2)
            for k, attention in enumerate(self.attentions2):
                self.add_module('adj{}, attention_layer2_{}'.format(i, k), attention)

        # Define linear layer for integration of multihead attention2
        self.integration_att2 = nn.Linear(nhid2 * nheads, nclass)

        #fusion layer with l1 penalty
        self.fusion_att = nn.Linear(nclass * len(adj_list), nclass)
        self.l1_reg = nn.L1Loss(reduction='mean')

    def forward(self, x, adj_list):
        x = F.dropout(x, self.dropout, training=self.training)
        # Compute output for each adjacency matrix using GAT layers
        output_list = []
        for i, adj in enumerate(adj_list):
            #attention layer 1
            x_i = torch.cat([att(x, adj) for att in self.attentions1[i]], dim=1)
            x_i = F.dropout(x_i, self.dropout, training=self.training)
            x_i = F.elu(self.integration_att1(x_i))

            #attention layer 2
            x_i = torch.cat([att(x_i, adj) for att in self.attentions2[i]], dim=1)
            x_i = F.dropout(x_i, self.dropout, training=self.training)
            x_i = F.elu(self.integration_att2(x_i))
            output_list.append(x_i)

        output = torch.cat(output_list, dim=1)
        output = F.dropout(output, self.dropout, training=self.training)
        output = self.fusion_att(output.view(output.size(0), -1))
        l1_loss = self.l1_reg(self.fusion_att.weight, torch.zeros_like(self.fusion_att.weight))
        return F.log_softmax(output, dim=1), l1_loss



class FusionGAT3(nn.Module):
    def __init__(self, nfeat, nhid1, nhid2, fusion1_dim, nclass, dropout, alpha, adj_list, nheads):
        super(FusionGAT3, self).__init__()

        self.dropout = dropout
        self.nheads = nheads
        self.adj_list = adj_list

        # Define list of GAT layers for each adjacency matrix
        #att 1
        self.attentions1 = nn.ModuleList()
        for i in range(len(adj_list)):
            att_list = nn.ModuleList([GraphAttentionLayer(nfeat, nhid1, dropout=dropout, alpha=alpha, concat=True) for _ in range(nheads)])
            self.attentions1.append(att_list)
            for k, attention in enumerate(self.attentions1):
                self.add_module('adj{}, attention_layer1_{}'.format(i, k), attention)

        # fusion1
        self.integration_att1 = nn.Linear(nhid1 * nheads, fusion1_dim)
        self.fusion_att1 = nn.Linear(fusion1_dim * len(adj_list), fusion1_dim)
        self.l1_reg1 = nn.L1Loss(reduction='mean')

        #att 2
        self.attentions2 = nn.ModuleList()
        for i in range(len(adj_list)):
            att_list = nn.ModuleList([GraphAttentionLayer(fusion1_dim, nhid2, dropout=dropout, alpha=alpha, concat=True) for _ in range(nheads)])
            self.attentions2.append(att_list)
            for k, attention in enumerate(self.attentions2):
                self.add_module('adj{}, attention_layer2_{}'.format(i, k), attention)

        #fusion2
        self.integration_att2 = nn.Linear(nhid2 * nheads, nclass)
        self.fusion_att2 = nn.Linear(nclass * len(adj_list), nclass)
        self.l1_reg2 = nn.L1Loss(reduction='mean')


    def forward(self, x, adj_list):
        x = F.dropout(x, self.dropout, training=self.training)

        # Compute output for each adjacency matrix using GAT layers
        output_list = []
        for i, adj in enumerate(adj_list):
            x_i = x
            x_i = torch.cat([att(x, adj) for att in self.attentions1[i]], dim=1)
            x_i = F.dropout(x_i, self.dropout, training=self.training)
            x_i = F.elu(self.integration_att1(x_i))
            output_list.append(x_i)
        output = torch.cat(output_list, dim=1)

        # Apply linear layer for integration with L1 regularization
        output = F.dropout(output, self.dropout, training=self.training)
        output = self.fusion_att1(output.view(output.size(0), -1))
        l1_loss1 = self.l1_reg1(self.fusion_att1.weight, torch.zeros_like(self.fusion_att1.weight))


        output_list2 = []
        for i, adj in enumerate(adj_list):
            x_i = torch.cat([att(output, adj) for att in self.attentions2[i]], dim=1)
            x_i = F.dropout(x_i, self.dropout, training=self.training)
            x_i = F.elu(self.integration_att2(x_i))
            output_list2.append(x_i)
        output2 = torch.cat(output_list2, dim=1)

        # Apply linear layer for integration with L1 regularization
        output2 = F.dropout(output2, self.dropout, training=self.training)
        output2 = self.fusion_att2(output2.view(output.size(0), -1))
        l1_loss2 = self.l1_reg2(self.fusion_att2.weight, torch.zeros_like(self.fusion_att2.weight))

        return F.log_softmax(output2, dim=1), l1_loss1 + l1_loss2

class GCN(nn.Module):
    def __init__(self, nfeat, nhid, nclass, dropout):
        super(GCN, self).__init__()

        self.gc1 = GraphConvolution(nfeat, nhid)
        self.gc2 = GraphConvolution(nhid, nclass)
        self.dropout = dropout

    def forward(self, x, adj):
        x = F.relu(self.gc1(x, adj))
        x = F.dropout(x, self.dropout, training=self.training)
        x = self.gc2(x, adj)
        return F.log_softmax(x, dim=1)

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from torch.nn.modules.module import Module

import math

class GraphAttentionLayer(nn.Module):
    """
    Simple GAT layer, similar to https://arxiv.org/abs/1710.10903
    """
    def __init__(self, in_features, out_features, dropout, alpha, concat=True):
        super(GraphAttentionLayer, self).__init__()
        self.dropout = dropout
        self.in_features = in_features
        self.out_features = out_features
        self.alpha = alpha
        self.concat = concat

        self.W = nn.Parameter(torch.empty(size=(in_features, out_features)))
        nn.init.xavier_uniform_(self.W.data, gain=1.414)
        self.a = nn.Parameter(torch.empty(size=(2*out_features, 1)))
        nn.init.xavier_uniform_(self.a.data, gain=1.414)

        self.leakyrelu = nn.LeakyReLU(self.alpha)

    def forward(self, h, adj):
        Wh = torch.mm(h, self.W) # h.shape: (N, in_features), Wh.shape: (N, out_features)
        e = self._prepare_attentional_mechanism_input(Wh)

        zero_vec = -9e15*torch.ones_like(e)
        attention = torch.where(adj > 0, e, zero_vec)
        attention = F.softmax(attention, dim=1)
        attention = F.dropout(attention, self.dropout, training=self.training)
        h_prime = torch.matmul(attention, Wh)

        if self.concat:
            return F.elu(h_prime)
        else:
            return h_prime

    def _prepare_attentional_mechanism_input(self, Wh):
        # Wh.shape (N, out_feature)
        # self.a.shape (2 * out_feature, 1)
        # Wh1&2.shape (N, 1)
        # e.shape (N, N)
        Wh1 = torch.matmul(Wh, self.a[:self.out_features, :])
        Wh2 = torch.matmul(Wh, self.a[self.out_features:, :])
        # broadcast add
        e = Wh1 + Wh2.T
        return self.leakyrelu(e)

    def __repr__(self):
        return self.__class__.__name__ + ' (' + str(self.in_features) + ' -> ' + str(self.out_features) + ')'


class GraphConvolution(Module):
    """
    Simple GCN layer, similar to https://arxiv.org/abs/1609.02907
    """

    def __init__(self, in_features, out_features, bias=True):
        super(GraphConvolution, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.weight = Parameter(torch.FloatTensor(in_features, out_features))
        if bias:
            self.bias = Parameter(torch.FloatTensor(out_features))
        else:
            self.register_parameter('bias', None)
        self.reset_parameters()

    def reset_parameters(self):
        stdv = 1. / math.sqrt(self.weight.size(1))
        self.weight.data.uniform_(-stdv, stdv)
        if self.bias is not None:
            self.bias.data.uniform_(-stdv, stdv)

    def forward(self, input, adj):
        support = torch.mm(input, self.weight)
        output = torch.spmm(adj, support)
        if self.bias is not None:
            return output + self.bias
        else:
            return output

    def __repr__(self):
        return self.__class__.__name__ + ' (' \
               + str(self.in_features) + ' -> ' \
               + str(self.out_features) + ')'

In [None]:
import os
import random
import numpy as np
import torch
from torch import Tensor
import torch_geometric
from torch_geometric.explain.algorithm.utils import clear_masks, set_masks
from torch_geometric.explain.config import ExplanationType, ModelMode, ModelTaskLevel
from typing import Optional, Tuple, Union

from torch_geometric.explain import Explainer, GNNExplainer, PGExplainer

from torch_geometric.explain.config import ModelMode, ModelReturnType
from torch_geometric.utils import get_embeddings

seed = 72
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

# Parameter settings
epochs = 500
lr = 0.0005
weight_decay = 5e-5
dropout = 0.3
hidden_dims1 = 64
hidden_dims2 = 64
fusion1_dim = 4
nb_head = 12
alpha = 0.2  # Alpha for the leaky_relu
lambda_l1 =  0.0001
patience = 50


sv_path = ""
folder = "# Insert data folder pathway here"
att_name = "# Insert node attribute dataset name here"

edge_list_name = ["# Insert sequence edge list dataset name here", 
                  "# Insert cl edge list dataset name here"]

data = load_multi_data(folder, att_name, edge_list_name)
idx_train, idx_test = data.train_mask, data.test_mask

model = FusionGAT3(
    nfeat=data.features.shape[1],
    nhid1=hidden_dims1,
    nhid2=hidden_dims2,
    fusion1_dim=fusion1_dim,
    nclass=int(data.labels.max()) + 1,
    dropout=dropout,
    alpha=alpha,
    adj_list=data.adj_ls,
    nheads=nb_head
)


model_name = "216.pkl"

# Restore best model
print('Loading model')
model.load_state_dict(torch.load(sv_path+ model_name))



from torch_geometric.explain import Explainer, GNNExplainer

class ModifiedExplainer(Explainer):
    def get_target(self, prediction):
        # Modify the get_target method here
        # You can access the prediction variable directly

        # Perform necessary changes or customizations
        if self.model_config.mode == ModelMode.binary_classification:
            # TODO: Allow customization of the thresholds used below.
            if self.model_config.return_type == ModelReturnType.raw:
                return (prediction > 0).long().view(-1)
            if self.model_config.return_type == ModelReturnType.probs:
                return (prediction > 0.5).long().view(-1)
            assert False

        if self.model_config.mode == ModelMode.multiclass_classification:
            return prediction[0].argmax(dim=-1)

        return prediction


class ModifiedGNNExplainer(GNNExplainer):
    def _train(
        self,
        model: torch.nn.Module,
        x: Tensor,
        edge_index: Tensor,
        *,
        target: Tensor,
        index: Optional[Union[int, Tensor]] = None,
        **kwargs,
    ):
        self._initialize_masks(x, edge_index)

        parameters = []
        if self.node_mask is not None:
            parameters.append(self.node_mask)
        if self.edge_mask is not None:
            set_masks(model, self.edge_mask, edge_index, apply_sigmoid=True)
            parameters.append(self.edge_mask)

        optimizer = torch.optim.Adam(parameters, lr=self.lr)

        for i in range(self.epochs):
            optimizer.zero_grad()

            h = x if self.node_mask is None else x * self.node_mask.sigmoid()
            y_hat, y = model(h, edge_index, **kwargs)[0], target

            if index is not None:
                y_hat, y = y_hat[index], y[index]

            loss = self._loss(y_hat, y)

            loss.backward()
            optimizer.step()

            # In the first iteration, we collect the nodes and edges that are
            # involved into making the prediction. These are all the nodes and
            # edges with gradient != 0 (without regularization applied).
            if i == 0 and self.node_mask is not None:
                if self.node_mask.grad is None:
                    raise ValueError("Could not compute gradients for node "
                                     "features. Please make sure that node "
                                     "features are used inside the model or "
                                     "disable it via `node_mask_type=None`.")
                self.hard_node_mask = self.node_mask.grad != 0.0
            if i == 0 and self.edge_mask is not None:
                if self.edge_mask.grad is None:
                    raise ValueError("Could not compute gradients for edges. "
                                     "Please make sure that edges are used "
                                     "via message passing inside the model or "
                                     "disable it via `edge_mask_type=None`.")
                self.hard_edge_mask = self.edge_mask.grad != 0.0



##add explainer
explainer = ModifiedExplainer(
    model=model,
    algorithm=ModifiedGNNExplainer(epochs=200),
    explanation_type="model",
    node_mask_type='attributes',
    model_config=dict(
        mode='multiclass_classification',
        task_level='node',
        return_type='log_probs',
    ),
)


Loading model


In [None]:

#target_label = data.labels
explanation = explainer(data.features, data.adj_ls)#, target = target_label )
print(f'Generated explanations in {explanation.available_explanations}')

Generated explanations in ['node_mask']


In [None]:
node_mask = explanation.node_mask
np.save(sv_path+f"node_mask_all.npy", node_mask)

In [None]:
path = f'feature_importance_top22.png'
feat_label = ['# Insert feature variables here']
explanation.visualize_feature_importance(sv_path+path, feat_labels = feat_label, top_k=22)
print(f"Feature importance plot has been saved to '{path}'")

Feature importance plot has been saved to 'feature_importance_top22.png'
