In [None]:
import math
import os.path as osp
from itertools import chain

import numpy as np
import torch
import torch.nn.functional as F
from scipy.sparse.csgraph import shortest_path
from sklearn.metrics import roc_auc_score,accuracy_score
from torch.nn import BCEWithLogitsLoss, Conv1d, MaxPool1d, ModuleList
from torch_geometric.data import Data, InMemoryDataset
from torch_geometric.datasets import Planetoid
from torch_geometric.loader import DataLoader
from torch_geometric.nn import MLP, GCNConv, global_sort_pool
from torch_geometric.transforms import RandomLinkSplit
from torch_geometric.utils import k_hop_subgraph, to_scipy_sparse_matrix, from_networkx,to_networkx
import matplotlib.pyplot as plt
import pickle
import networkx as nx
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
import warnings
warnings.filterwarnings('ignore')

In [None]:
class MyOwnDataset(InMemoryDataset):
    def __init__(self, root, transform=None, pre_transform=None, pre_filter=None):
        super().__init__(root, transform, pre_transform, pre_filter)
        self.data, self.slices = torch.load(self.processed_paths[0])

#     @property
#     def raw_file_names(self):
#         return ['ltspice_examples_torch.pt']


    @property
    def processed_file_names(self):
        return ['ltspice_examples_dataset_complete.pt']

#     def download(self):
#         # Download to `self.raw_dir`.
#         download_url(url, self.raw_dir)
#         ...

    def process(self):
        # Read data into huge `Data` list.
#         data_list = [...]
        print("processing data now!")
        data_list = [torch.load('data/ltspice_examples_torch_LP_complete.pt')]
        if self.pre_filter is not None:
            data_list = [data for data in data_list if self.pre_filter(data)]

        if self.pre_transform is not None:
            data_list = [self.pre_transform(data) for data in data_list]

        data, slices = self.collate(data_list)
        print("saving path:",self.processed_paths[0])
        torch.save((data, slices), self.processed_paths[0])

In [None]:
class SEALDataset(InMemoryDataset):
    def __init__(self, dataset, num_hops, split='train'):
        self.data = dataset[0]
        self.num_hops = num_hops
        super().__init__(dataset.root)
        index = ['train', 'val', 'test'].index(split)
        self.data, self.slices = torch.load(self.processed_paths[index])

    @property
    def processed_file_names(self):
        return ['SEAL_train_data_ltspice_examples_complete.pt', 'SEAL_val_data_lstspice_examples_complete.pt', 'SEAL_test_data_lstspice_examples_complete.pt']

    def process(self):
        transform = RandomLinkSplit(num_val=0.1, num_test=0.2,
                                    is_undirected=True, split_labels=True)
        train_data, val_data, test_data = transform(self.data)
        
        self._max_z = 0
        # Collect a list of subgraphs for training, validation and testing:
        train_pos_data_list = self.extract_enclosing_subgraphs(
            train_data.edge_index, train_data.pos_edge_label_index, 1)
        train_neg_data_list = self.extract_enclosing_subgraphs(
            train_data.edge_index, train_data.neg_edge_label_index, 0)

        val_pos_data_list = self.extract_enclosing_subgraphs(
            val_data.edge_index, val_data.pos_edge_label_index, 1)
        val_neg_data_list = self.extract_enclosing_subgraphs(
            val_data.edge_index, val_data.neg_edge_label_index, 0)

        test_pos_data_list = self.extract_enclosing_subgraphs(
            test_data.edge_index, test_data.pos_edge_label_index, 1)
        test_neg_data_list = self.extract_enclosing_subgraphs(
            test_data.edge_index, test_data.neg_edge_label_index, 0)

        # Convert node labeling to one-hot features.
        for data in chain(train_pos_data_list, train_neg_data_list,
                          val_pos_data_list, val_neg_data_list,
                          test_pos_data_list, test_neg_data_list):
            # We solely learn links from structure, dropping any node features:
            one_hot = F.one_hot(data.z, self._max_z + 1).to(torch.float)
            x = data.x
            data.x = torch.cat((one_hot,x),1) 
            

        torch.save(self.collate(train_pos_data_list + train_neg_data_list),
                   self.processed_paths[0])
        torch.save(self.collate(val_pos_data_list + val_neg_data_list),
                   self.processed_paths[1])
        torch.save(self.collate(test_pos_data_list + test_neg_data_list),
                   self.processed_paths[2])

    def extract_enclosing_subgraphs(self, edge_index, edge_label_index, y):
        data_list = []
        for src, dst in edge_label_index.t().tolist():
            sub_nodes, sub_edge_index, mapping, _ = k_hop_subgraph(
                [src, dst], self.num_hops, edge_index, relabel_nodes=True)
            src, dst = mapping.tolist()

            # Remove target link from the subgraph.
            mask1 = (sub_edge_index[0] != src) | (sub_edge_index[1] != dst)
            mask2 = (sub_edge_index[0] != dst) | (sub_edge_index[1] != src)
            sub_edge_index = sub_edge_index[:, mask1 & mask2]

            # Calculate node labeling.
            z = self.drnl_node_labeling(sub_edge_index, src, dst,
                                        num_nodes=sub_nodes.size(0))

            data = Data(x=self.data.x[sub_nodes], z=z,
                        edge_index=sub_edge_index, y=y)
            data_list.append(data)

        return data_list

    def drnl_node_labeling(self, edge_index, src, dst, num_nodes=None):
        # Double-radius node labeling (DRNL).
        src, dst = (dst, src) if src > dst else (src, dst)
        adj = to_scipy_sparse_matrix(edge_index, num_nodes=num_nodes).tocsr()

        idx = list(range(src)) + list(range(src + 1, adj.shape[0]))
        adj_wo_src = adj[idx, :][:, idx]

        idx = list(range(dst)) + list(range(dst + 1, adj.shape[0]))
        adj_wo_dst = adj[idx, :][:, idx]

        dist2src = shortest_path(adj_wo_dst, directed=False, unweighted=True,
                                 indices=src)
        dist2src = np.insert(dist2src, dst, 0, axis=0)
        dist2src = torch.from_numpy(dist2src)

        dist2dst = shortest_path(adj_wo_src, directed=False, unweighted=True,
                                 indices=dst - 1)
        dist2dst = np.insert(dist2dst, src, 0, axis=0)
        dist2dst = torch.from_numpy(dist2dst)

        dist = dist2src + dist2dst
        dist_over_2, dist_mod_2 = dist // 2, dist % 2

        z = 1 + torch.min(dist2src, dist2dst)
        z += dist_over_2 * (dist_over_2 + dist_mod_2 - 1)
        z[src] = 1.
        z[dst] = 1.
        z[torch.isnan(z)] = 0.

        self._max_z = max(int(z.max()), self._max_z)

        return z.to(torch.long)


# path = osp.join(osp.dirname(osp.realpath("data/")), '..', 'data', 'Planetoid')
# print("path,",path)

In [None]:
# dataset = Planetoid("data/", name='Cora')
dataset = MyOwnDataset("data/")
train_dataset = SEALDataset(dataset, num_hops=2, split='train')
val_dataset = SEALDataset(dataset, num_hops=2, split='val')
test_dataset = SEALDataset(dataset, num_hops=2, split='test')

In [None]:
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1)
test_loader = DataLoader(test_dataset, batch_size=1)

In [None]:

class DGCNN(torch.nn.Module):
    def __init__(self, hidden_channels, num_layers, GNN=GCNConv, k=0.6):
        super().__init__()

        if k < 1:  # Transform percentile to number.
            num_nodes = sorted([data.num_nodes for data in train_dataset])
            k = num_nodes[int(math.ceil(k * len(num_nodes))) - 1]
            k = max(10, k)
        self.k = int(k)

        self.convs = ModuleList()
        self.convs.append(GNN(train_dataset.num_features, hidden_channels))
        for i in range(0, num_layers - 1):
            self.convs.append(GNN(hidden_channels, hidden_channels))
        self.convs.append(GNN(hidden_channels, 1))

        conv1d_channels = [16, 32]
        total_latent_dim = hidden_channels * num_layers + 1
        conv1d_kws = [total_latent_dim, 5]
        self.conv1 = Conv1d(1, conv1d_channels[0], conv1d_kws[0],
                            conv1d_kws[0])
        self.maxpool1d = MaxPool1d(2, 2)
        self.conv2 = Conv1d(conv1d_channels[0], conv1d_channels[1],
                            conv1d_kws[1], 1)
        dense_dim = int((self.k - 2) / 2 + 1)
        dense_dim = (dense_dim - conv1d_kws[1] + 1) * conv1d_channels[1]
        self.mlp = MLP([dense_dim, 128, 1], dropout=0.5, batch_norm=False)

    def forward(self, x, edge_index, batch):
        xs = [x]
        for conv in self.convs:
            xs += [conv(xs[-1], edge_index).tanh()]
        x = torch.cat(xs[1:], dim=-1)

        # Global pooling.
        x = global_sort_pool(x, batch, self.k)
        
        x = x.unsqueeze(1)  # [num_graphs, 1, k * hidden]
        x = self.conv1(x).relu()
        x = self.maxpool1d(x)
        x = self.conv2(x).relu()
        x = x.view(x.size(0), -1)  # [num_graphs, dense_dim]
        return self.mlp(x)




In [None]:
def train():
    model.train()

    total_loss = 0
    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data.x, data.edge_index, data.batch)
        
        loss = criterion(out.view(-1), data.y.to(torch.float))
        loss.backward()
        optimizer.step()
        total_loss += float(loss) * data.num_graphs

    return total_loss / len(train_dataset)


@torch.no_grad()
def test(loader):
    model.eval()
    y_pred, y_true = [], []
    for data in loader:
        data = data.to(device)
        logits = model(data.x, data.edge_index, data.batch)
        y_pred.append(logits.view(-1).cpu())
        y_true.append(data.y.view(-1).cpu().to(torch.float))

    return roc_auc_score(torch.cat(y_true), torch.cat(y_pred))

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = DGCNN(hidden_channels=32, num_layers=3).to(device)
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.00001)
criterion = BCEWithLogitsLoss()
best_val_auc = test_auc = 0
loss_list, test_list,val_list = [],[],[]
for epoch in range(1, 50):
    loss = train()
    val_auc = test(val_loader)
    if val_auc > best_val_auc:
        best_val_auc = val_auc
        test_auc = test(test_loader)
    print(f'Epoch: {epoch:02d}, Loss: {loss:.4f}, Val: {val_auc:.4f}, '
          f'Test: {test_auc:.4f}')
    loss_list.append(loss)
    test_list.append(test_auc)
    val_list.append(val_auc)

In [None]:
# metrics_data = {'train_loss':loss_list, 'test_AUC': test_list,'val_AUC':val_list}
# with open("results/metrics_ltpsice_examples_complete.pkl","wb") as f:
#     pickle.dump(metrics_data,f)
# f.close()


In [None]:
#save the trained model
torch.save(model, "model-save/SEAL_circuit_model-1-50-ltpsice_examples_complete.pt")

In [None]:
def load_test_data():
    with open("data/ltspice_examples_LP_complete.pkl",'rb') as f:
        dataset = pickle.load(f)
        f.close()
    with open('data/ltspice_examples_label_mapping.pkl', 'rb') as f:
        mapping = pickle.load(f)
        f.close()
    test_data = dataset['test_x'] 
    return test_data, mapping
### create test objects one by one
def create_test_objects(test_data,mapping):
    all_test_graphs = []
    for g in test_data:
        X = []
        for i, n in enumerate(g.nodes()):
            feat = np.zeros((5,),dtype = np.float64)
            node_type = g.nodes[n]['type']
            index = mapping.get(node_type)
            feat[index] = 1.0
            X.append(feat)
        g_ = nx.Graph()
        g_.add_nodes_from(g.nodes())
        g_.add_edges_from(g.edges())
        d = from_networkx(g_)
        d.x = torch.from_numpy(np.array(X))
        all_test_graphs.append(d)
    return all_test_graphs
##creating one test graph
def create_one_test_graph(test_data, mapping):
    X = []
    all_graphs = []
    for g in test_data:
        for i, n in enumerate(g.nodes()):
            feat = np.zeros((5,),dtype = np.float64)
            node_type = g.nodes[n]['type']
            index = mapping.get(node_type)
            feat[index] = 1.0
            X.append(feat)
        g_ = nx.Graph()
        g_.add_nodes_from(g.nodes())
        g_.add_edges_from(g.edges())
        all_graphs.append(g_)
    graph = nx.disjoint_union_all(all_graphs)
    print("total number of nodes and edges:", graph.number_of_nodes(), graph.number_of_edges())
    test_data1 = from_networkx(graph,group_node_attrs=None,group_edge_attrs=None )
    test_data1.x = torch.from_numpy(np.array(X)).float()
    return test_data1
@torch.no_grad()
def test1(model, loader):
    model.eval()
    y_pred, y_true = [], []
    for data in loader:
        data = data.to(device)
        logits = model(data.x, data.edge_index, data.batch)
        y_pred.append(logits.view(-1).cpu())
        y_true.append(data.y.view(-1).cpu().to(torch.float))
        
    return roc_auc_score(torch.cat(y_true), torch.cat(y_pred)),torch.cat(y_true),torch.cat(y_pred)
#     return accuracy_score(torch.sigmoid(torch.cat(y_true)), torch.sigmoid(torch.cat(y_pred))),torch.cat(y_true),torch.cat(y_pred)
class SEAL_graph():
    def __init__(self, data):
        self.data = data
#         self.z = 28
#         print("num of nodes in the graph:", self.data.num_nodes)
    
    def extract_enclosing_subgraphs(self,edge_index, edge_label_index, y):
        data_list = []
        z_max = index = 0
        
        for src, dst in edge_label_index.t().tolist():
            try:
                index +=1
                sub_nodes, sub_edge_index, mapping, _ = k_hop_subgraph(
                    [src, dst], 2, edge_index, relabel_nodes=True)
                src, dst = mapping.tolist()

                # Remove target link from the subgraph.
                mask1 = (sub_edge_index[0] != src) | (sub_edge_index[1] != dst)
                mask2 = (sub_edge_index[0] != dst) | (sub_edge_index[1] != src)
                sub_edge_index = sub_edge_index[:, mask1 & mask2]

                # Calculate node labeling.
                z,_max_z = self.drnl_node_labeling(sub_edge_index, src, dst,z_max, num_nodes=sub_nodes.size(0))
                
                z_max = _max_z
                data = Data(x=self.data.x[sub_nodes], z=z,
                            edge_index=sub_edge_index, y=y[index])
                data_list.append(data)
            except:
                continue

        return data_list

    def drnl_node_labeling(self,edge_index, src, dst,z_max,num_nodes=None):
        # Double-radius node labeling (DRNL).
        src, dst = (dst, src) if src > dst else (src, dst)
        adj = to_scipy_sparse_matrix(edge_index, num_nodes=num_nodes).tocsr()

        idx = list(range(src)) + list(range(src + 1, adj.shape[0]))
        adj_wo_src = adj[idx, :][:, idx]

        idx = list(range(dst)) + list(range(dst + 1, adj.shape[0]))
        adj_wo_dst = adj[idx, :][:, idx]

        dist2src = shortest_path(adj_wo_dst, directed=False, unweighted=True,
                                 indices=src)
        dist2src = np.insert(dist2src, dst, 0, axis=0)
        dist2src = torch.from_numpy(dist2src)

        dist2dst = shortest_path(adj_wo_src, directed=False, unweighted=True,
                                 indices=dst - 1)
        dist2dst = np.insert(dist2dst, src, 0, axis=0)
        dist2dst = torch.from_numpy(dist2dst)

        dist = dist2src + dist2dst
        dist_over_2, dist_mod_2 = dist // 2, dist % 2

        z = 1 + torch.min(dist2src, dist2dst)
        z += dist_over_2 * (dist_over_2 + dist_mod_2 - 1)
        z[src] = 1.
        z[dst] = 1.
        z[torch.isnan(z)] = 0.
        _max_z = max(int(z.max()), z_max) 
        return z.to(torch.long),_max_z

In [None]:

# model = torch.load("model-save/SEAL_circuit_model-1-50-ltpsice_examples_complete.pt")


In [None]:
##testing on ltspice_examples test set
test_data, mapping = load_test_data()


test_graphs = create_test_objects(test_data,mapping)

auc_per_graph = []
for index in range(len(test_graphs)):
    tst_graph = test_graphs[index]
    tst_graph.x = tst_graph.x.to(torch.float)
    last_node = tst_graph.num_nodes-1
    e_index = tst_graph.edge_index.numpy()
    indices = np.where(e_index==last_node)[1]
    if indices.shape[0]==0:continue
    positive_edges = e_index[:,indices]
    e_index = np.delete(e_index,np.where(e_index==last_node)[1], axis = 1)
    self_loop = np.array([[last_node],[last_node]])
    e_index = np.concatenate((e_index, self_loop), axis = 1)
    neg_edges = np.zeros((2,int(last_node-positive_edges.shape[1]/2)),dtype = int)
    n = 0
    for i in range(last_node):
        if i not in positive_edges:
            neg_edges[0,n] = i
            neg_edges[1,n] = last_node
            n +=1
    ### ADDING NEGATIVE EDGES TO THE GRAPH FOR CLASSIFICATION
    tst_graph.edge_index = torch.tensor(e_index)
    seal_data = SEAL_graph(tst_graph)
    pos_edge_index = np.concatenate((neg_edges,positive_edges),axis = 1)
    labels = np.zeros((pos_edge_index.shape[1]),dtype = int)
    labels[-int(positive_edges.shape[1]/2):] =1
    test_pos_data_list = seal_data.extract_enclosing_subgraphs(tst_graph.edge_index, torch.tensor(pos_edge_index), torch.tensor(labels))
    z_max = 28
    # # Convert node labeling to one-hot features.
    for data in test_pos_data_list:
        data.z[data.z>z_max] = z_max
#             data.x = F.one_hot(data.z, z_max + 1).to(torch.float)
        one_hot = F.one_hot(data.z, z_max + 1).to(torch.float)
        x = data.x
        data.x = torch.cat((one_hot,x),1)
    test_loader = DataLoader(test_pos_data_list, batch_size=1)
    test_auc,true, pred = test1(model,test_loader)
    auc_per_graph.append(test_auc);


print("average AUC: {}".format(np.mean(auc_per_graph)))

