# Technical Details

In [16]:
# Dependencies
from typing import List, Tuple, Dict
import numpy as np 

import torch 
import torch.nn as nn
import torch.nn.functional as F

import torch_geometric as pyg

# Check GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cuda


In [2]:
# Reproducibility
SEED = 123
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True 
np.random.seed(SEED)

In [3]:
# Meta Data

## Explainee Model

Reference: https://colab.research.google.com/drive/1I8a0DfQ3fI7Njc62__mVXUlcAleUclnb?usp=sharing

In [7]:
# Download data. 
from torch_geometric.datasets import TUDataset
data_raw = TUDataset(root='data/TUDataset', name='MUTAG')

# Shuffle.
data_raw = data_raw.shuffle()

# Split.
train_data = data_raw[:150]
test_data = data_raw[150:]

# Create data lists.
train_data_list = []
train_data_list_0 = []
train_data_list_1 = []
test_data_list = []
test_data_list_0 = []
test_data_list_1 = []

for graph in train_data:
    train_data_list.append(graph)

    if graph.y.item() == 0: 
        train_data_list_0.append(graph)

    elif graph.y.item() == 1: 
        train_data_list_1.append(graph)

for graph in test_data:
    test_data_list.append(graph)

    if graph.y.item() == 0: 
        test_data_list_0.append(graph)

    elif graph.y.item() == 1: 
        test_data_list_1.append(graph)

# Create data loaders.
train_loader = pyg.loader.DataLoader(train_data_list, batch_size=64, shuffle=True)
test_loader = pyg.loader.DataLoader(test_data_list, batch_size=64, shuffle=True)

In [176]:
# Padded dataset. 
max_num_nodes = 30 # 28 in the full dataset.  

# ChatGPT: zero pad torch geometric data so every graph has the same number 
#          of nodes.
# Edited
def pad_graph(data, max_num_nodes):
    num_nodes = data.num_nodes
    
    # Pad node features
    padded_x = torch.zeros((max_num_nodes, data.x.size(1)))
    padded_x[:num_nodes] = data.x
    
    # Create padded adjacency matrix
    padded_adj = torch.zeros((max_num_nodes, max_num_nodes))
    padded_adj[:num_nodes, :num_nodes] = (
        pyg.utils.to_dense_adj(data.edge_index).squeeze(0)
    )
    
    # Create edge index from padded adjacency matrix
    padded_edge_index = pyg.utils.dense_to_sparse(padded_adj)[0]
    
    # Create new data object with padded features
    padded_data = pyg.data.Data(x=padded_x, edge_index=padded_edge_index,
                                y=data.y)
    
    return padded_data

# Create padded data lists.
train_data_list_padded = []
train_data_list_0_padded = []
train_data_list_1_padded = []
test_data_list_padded= []
test_data_list_0_padded = []
test_data_list_1_padded = []

for graph in train_data:
    train_data_list.append(pad_graph(graph, max_num_nodes))

    if graph.y.item() == 0: 
        train_data_list_0.append(pad_graph(graph, max_num_nodes))

    elif graph.y.item() == 1: 
        train_data_list_1.append(pad_graph(graph, max_num_nodes))

for graph in test_data:
    test_data_list.append(pad_graph(graph, max_num_nodes))

    if graph.y.item() == 0: 
        test_data_list_0.append(pad_graph(graph, max_num_nodes))

    elif graph.y.item() == 1: 
        test_data_list_1.append(pad_graph(graph, max_num_nodes))

In [8]:
# Explainee GNN Model 
class GCN(nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        self.conv1 = pyg.nn.GCNConv(7, hidden_channels) # 7 node features.
        self.conv2 = pyg.nn.GCNConv(hidden_channels, hidden_channels)
        self.conv3 = pyg.nn.GCNConv(hidden_channels, hidden_channels)
        self.lin = nn.Linear(hidden_channels, 2) # 2 classes.

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # 1. Node embeddings.
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)

        # 2. Pooling.
        x = pyg.nn.global_mean_pool(x, batch)

        # 3. Prediction.
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)

        return x

In [9]:
# Training Explainee.
model = GCN(hidden_channels=64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train(data_loader): 
    model.train()

    for batch in data_loader: 
        out = model(batch)
        loss = criterion(out, batch.y)
        loss.backward()

        optimizer.step()
        optimizer.zero_grad()

def model_accuracy(data_loader):
    model.eval()

    correct = 0
    for batch in data_loader: 
        out = model(batch)
        pred = out.argmax(dim=1)
        correct += int((pred == batch.y).sum())

    return correct / len(data_loader.dataset)

for epoch in range(1, 171): 
    train(train_loader)
    train_accuracy = model_accuracy(train_loader)
    test_accuracy = model_accuracy(test_loader)

    print(f"Epoch: {epoch} Train Accuracy: {train_accuracy} " + 
          f"Test Accuracy: {test_accuracy}")

Epoch: 1 Train Accuracy: 0.7 Test Accuracy: 0.5263157894736842
Epoch: 2 Train Accuracy: 0.7 Test Accuracy: 0.5263157894736842
Epoch: 3 Train Accuracy: 0.7 Test Accuracy: 0.5263157894736842
Epoch: 4 Train Accuracy: 0.7 Test Accuracy: 0.5263157894736842
Epoch: 5 Train Accuracy: 0.7 Test Accuracy: 0.5263157894736842
Epoch: 6 Train Accuracy: 0.7133333333333334 Test Accuracy: 0.5263157894736842
Epoch: 7 Train Accuracy: 0.7466666666666667 Test Accuracy: 0.6052631578947368
Epoch: 8 Train Accuracy: 0.7533333333333333 Test Accuracy: 0.6052631578947368
Epoch: 9 Train Accuracy: 0.7466666666666667 Test Accuracy: 0.6052631578947368
Epoch: 10 Train Accuracy: 0.7466666666666667 Test Accuracy: 0.7105263157894737
Epoch: 11 Train Accuracy: 0.7666666666666667 Test Accuracy: 0.7105263157894737
Epoch: 12 Train Accuracy: 0.7466666666666667 Test Accuracy: 0.7105263157894737
Epoch: 13 Train Accuracy: 0.7533333333333333 Test Accuracy: 0.7105263157894737
Epoch: 14 Train Accuracy: 0.7666666666666667 Test Accurac

In [10]:
# Confusion Matrix
from sklearn.metrics import confusion_matrix

full_batch = pyg.data.Batch.from_data_list(test_data_list + train_data_list)
model.eval()
preds = model(full_batch).argmax(dim=1).numpy()
targets = full_batch.y.numpy()

conf_matrix = confusion_matrix(targets, preds)
conf_matrix

array([[ 37,  26],
       [ 10, 115]], dtype=int64)

## Diffusion Generator 

betas = []

for batch in loader:
    adj_batch = func(batch)
    t ~ U[1, 50]
    noised_adj = func(adj_batch)

    pred_adj = model(noised_adj, t)

    CrtEnt(adj_batch, pred_adj)



In [147]:
T = 50

betas = torch.linspace(start=0.001, end=0.1, steps=T)
beta_bars = []
cum_prod = 1

for beta in betas:
    cum_prod *= (1 - 2*beta)
    beta_bars.append(0.5 - 0.5 * cum_prod)


In [164]:
def graph_to_tensors(graphs: pyg.data.Batch, 
                     max_num_nodes=None) -> List[torch.tensor]:
    # [b, n, n]
    adj_batch = pyg.utils.to_dense_adj(graphs.edge_index, batch=graphs.batch, 
                                       max_num_nodes=max_num_nodes,
                                       edge_attr = graphs.edge_attr)
    # D4 uses the mask somehow.
    x_batch, node_feat_mask = pyg.utils.to_dense_batch(graphs.x, graphs.batch, 
                                       max_num_nodes=max_num_nodes)    
    return [adj_batch, x_batch, node_feat_mask]

def forward_diffusion_sample(adj_batch: torch.tensor, t: int) -> torch.tensor:
    transition_probs = torch.full_like(adj_batch, beta_bars[t])

    # Symmetrically applies noise - treats edges as undirected.
    noise_upper = torch.bernoulli(transition_probs).triu(diagonal=1)
    noise_lower = noise_upper.transpose(-1, -2)
    noised_adj_batch = torch.abs(adj_batch + noise_upper + noise_lower)

    return noised_adj_batch

class denoising_model(nn.Module):
    def __init__(self):
        super(denoising_model, self).__init__()


    def forward(node_features: torch.tensor, 
                noised_adj_batch: torch.tensor, t: int) -> torch.tensor:
        """
        node_features: [b, n, f].
        noised_adj_batch: [b, n, n].
        returns: [b, n, n] predicted adj.
        """
        pass



        


# Debugging

In [79]:
T = 10
low_noise = 0.0
high_noise = 0.5
noise_list = list(np.random.uniform(low=low_noise, high=high_noise, size=T))

# Bernoulli distribution for the probability of an edge existing.
bernoulli_adj = torch.full_like(x[1], noise_list[0])

# Symmetrically applies noise - treats edges as undirected.
noise_upper = torch.bernoulli(bernoulli_adj).triu(diagonal=1)
noise_lower = noise_upper.transpose(-1, -2)
train_adj = torch.abs(-x[1] + noise_upper + noise_lower)

noisediff = noise_upper + noise_lower # record true noise. 

print((train_adj - x[1]).abs().sum())
print(noisediff.sum())

tensor(6762.)
tensor(6762.)


In [69]:
x_half = x[1] > 1 / 2
x_1 = x[1] >= 1
torch.allclose(x_half, x_1)

True

In [170]:
# Manual training step.

batch = pyg.data.Batch.from_data_list(train_data_list[:2])

adj_batch, x_batch, node_feat_mask = graph_to_tensors(
    batch, max_num_nodes)

print(adj_batch.shape, x_batch.shape, node_feat_mask.shape)

adj_batch_1 = adj_batch[:, :, :, 0] 
adj_batch_1

adj_batch_2 = pyg.utils.to_dense_adj(
    batch.edge_index, batch=batch.batch, max_num_nodes=max_num_nodes
)
torch.allclose(adj_batch_1, adj_batch_2)
batch.edge_attr.shape

pyg.utils.dense_to_sparse(adj_batch)

torch.Size([2, 30, 30, 4]) torch.Size([2, 30, 7]) torch.Size([2, 30])


ValueError: Dense adjacency matrix 'adj' must be two- or three-dimensional (got 4 dimensions)

In [138]:
max_obs_nodes = 0
for graph in train_data_list + test_data_list:
    if graph.x.shape[0] > max_obs_nodes:
        max_obs_nodes = graph.x.shape[0]

print(max_obs_nodes)

28
