In [1]:
# Install required packages.
import os
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import numpy as np


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
os.getcwd()

'c:\\Users\\david\\pyproj\\mt\\models'

In [3]:
data_dir = 'c:/Users/david/MT_code/data/extracted_patches/mutant_graphs_classification/'
path = os.path.join(data_dir, 'AAAH_GraphPatch.pkl')

In [5]:
os.getcwd()

'c:\\Users\\david\\pyproj\\mt\\models'

In [4]:
from f_helper_functions import *

patch = load_object(path)
print(patch.distance_matrix.shape)
print(patch.edge_index.shape)
print(patch.edge_weight.shape)
print(patch.fitness)
print(patch.A.shape)
print(patch.mutant)
print()
print(patch)

ModuleNotFoundError: No module named 'f_helper_functions'

In [None]:
#from torch.utils.data import Dataset
from torch_geometric.data import Dataset
from torch_geometric.data import Data
from torch_geometric.utils import add_self_loops


class PatchDataset(Dataset):
    def __init__(self, data_dir):
        self.data_dir = data_dir
        self.mutants = [mutant[0:4] for mutant in os.listdir(data_dir)]

    def __len__(self):
        return len(os.listdir(self.data_dir))

    def __getitem__(self, idx):
        path = os.path.join(self.data_dir, self.mutants[idx]+'_GraphPatch.pkl')
        patch = load_object(path)
        
        x = torch.from_numpy(patch.features)
        y=torch.from_numpy(patch.fitness.astype(np.int64))
        pos=torch.from_numpy(patch.coords)

        edge_weight=torch.from_numpy(patch.edge_weight)

        edge_index=torch.from_numpy(patch.edge_index)
        edge_index, edge_weight = add_self_loops(edge_index, edge_weight, fill_value=0)

        return  Data(x, edge_index, edge_weight, y, pos)

In [None]:
batch_size = 8

In [None]:
dataset = PatchDataset(data_dir = data_dir)
len(dataset)

1500

In [None]:
print()
print(f'Dataset: {dataset}:')
print('====================')
print(f'Number of graphs: {len(dataset)}')


data = dataset[0]  # Get the first graph object.

print()
print(data)
print('=============================================================')

# Gather some statistics about the first graph.
print(f'Number of nodes: {data.num_nodes}')
print(f'Number of node features: {data.num_node_features}')
print(f'Number of edges: {data.num_edges}')
print(f'Average node degree: {data.num_edges / data.num_nodes:.2f}')
print(f'Contains self-loops: {data.has_self_loops()}')


Dataset: PatchDataset(1500):
Number of graphs: 1500

Data(x=[1049, 16], edge_index=[2, 6435], edge_attr=[6435, 1], y=1, pos=[1049, 3])
Number of nodes: 1049
Number of node features: 16
Number of edges: 6435
Average node degree: 6.13
Contains self-loops: True


In [None]:
data.edge_attr

tensor([[0.3675],
        [0.6447],
        [0.6881],
        ...,
        [0.0000],
        [0.0000],
        [0.0000]], dtype=torch.float64)

In [None]:
n_train = len(dataset) * 0.8
n_test = len(dataset) * 0.2

trainset, testset = torch.utils.data.random_split(dataset, [int(n_train),int(n_test)])

print(f'Number of training graphs: {len(trainset)}')
print(f'Number of test graphs: {len(testset)}')

Number of training graphs: 1200
Number of test graphs: 300


In [None]:
train_fraction = {0:0, 1:0}
for step, training_sample in enumerate(trainset):
    if training_sample.y == 0:
        train_fraction[0] +=1
    else: 
        train_fraction[1] +=1

print('Number of zeros in trainset: {z}'.format(z = train_fraction[0]))
print('Number of ones in trainset: {o}'.format(o = train_fraction[1]))
print('Fraction of ones in trainset: {f:2f}'.format(f=train_fraction[1]/n_train))

Number of zeros in trainset: 349
Number of ones in trainset: 851
Fraction of ones in trainset: 0.709167


In [None]:
test_fraction = {0:0, 1:0}
for step, test_sample in enumerate(testset):
    if test_sample.y == 0:
        test_fraction[0] +=1
    else: 
        test_fraction[1] +=1

print('Number of zeros in testset: {z}'.format(z = test_fraction[0]))
print('Number of ones in testset: {o}'.format(o = test_fraction[1]))
print('Fraction of ones in testset: {f:2f}'.format(f=test_fraction[1]/n_test))

Number of zeros in testset: 88
Number of ones in testset: 212
Fraction of ones in testset: 0.706667


In [None]:
from torch_geometric.loader import DataLoader

trainloader = DataLoader(dataset = trainset, batch_size= batch_size, shuffle = True)
testloader = DataLoader(dataset = testset, batch_size= batch_size, shuffle = True)

In [None]:

#for data in enumerate(trainloader):
#    print(data[1].x.shape[1])


## Training a Graph Neural Network (GNN)

Training a GNN for graph classification usually follows a simple recipe:

1. Embed each node by performing multiple rounds of message passing
2. Aggregate node embeddings into a unified graph embedding (**readout layer**)
3. Train a final classifier on the graph embedding

There exists multiple **readout layers** in literature, but the most common one is to simply take the average of node embeddings:

$$
\mathbf{x}_{\mathcal{G}} = \frac{1}{|\mathcal{V}|} \sum_{v \in \mathcal{V}} \mathcal{x}^{(L)}_v
$$

PyTorch Geometric provides this functionality via [`torch_geometric.nn.global_mean_pool`](https://pytorch-geometric.readthedocs.io/en/latest/modules/nn.html#torch_geometric.nn.glob.global_mean_pool), which takes in the node embeddings of all nodes in the mini-batch and the assignment vector `batch` to compute a graph embedding of size `[batch_size, hidden_channels]` for each graph in the batch.

The final architecture for applying GNNs to the task of graph classification then looks as follows and allows for complete end-to-end training:

In [None]:
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool


class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(16, hidden_channels)                   # 16-->64 node features
        self.conv2 = GCNConv(hidden_channels, hidden_channels)      # 64-->64 node features
        self.conv3 = GCNConv(hidden_channels, hidden_channels)      # 64-->64 node features
        self.lin = Linear(hidden_channels, 2)                       # 64-->2 node features fully connected layer

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)

        # 2. Readout layer
        # Take the mean over all nodes in each graph = 16 values per graph
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

model = GCN(hidden_channels=64)
print(model)

GCN(
  (conv1): GCNConv(16, 64)
  (conv2): GCNConv(64, 64)
  (conv3): GCNConv(64, 64)
  (lin): Linear(in_features=64, out_features=2, bias=True)
)


Here, we again make use of the [`GCNConv`](https://pytorch-geometric.readthedocs.io/en/latest/modules/nn.html#torch_geometric.nn.conv.GCNConv) with $\mathrm{ReLU}(x) = \max(x, 0)$ activation for obtaining localized node embeddings, before we apply our final classifier on top of a graph readout layer.

Let's train our network for a few epochs to see how well it performs on the training as well as test set:

In [None]:
from IPython.display import Javascript
display(Javascript('''google.colab.output.setIframeHeight(0, true, {maxHeight: 300})'''))

model = GCN(hidden_channels=64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()
    for data in trainloader:  # Iterate in batches over the training dataset.
         out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
         loss = criterion(out, data.y)  # Compute the loss.
         loss.backward()  # Derive gradients.
         optimizer.step()  # Update parameters based on gradients.
         optimizer.zero_grad()  # Clear gradients.


def test(loader):
     model.eval()

     correct = 0
     for data in loader:  # Iterate in batches over the training/test dataset.
         out = model(data.x, data.edge_index, data.batch)  
         pred = out.argmax(dim=1)  # Use the class with highest probability.
         correct += int((pred == data.y).sum())  # Check against ground-truth labels.
     return correct / len(loader.dataset)  # Derive ratio of correct predictions.


for epoch in range(1, 10):
    loss = train()
    
    train_acc = test(trainloader)
    test_acc = test(testloader)
    print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

<IPython.core.display.Javascript object>

Epoch: 001, Train Acc: 0.7092, Test Acc: 0.7067
Epoch: 002, Train Acc: 0.7092, Test Acc: 0.7067
Epoch: 003, Train Acc: 0.7092, Test Acc: 0.7067
Epoch: 004, Train Acc: 0.7092, Test Acc: 0.7067
Epoch: 005, Train Acc: 0.7092, Test Acc: 0.7067
Epoch: 006, Train Acc: 0.7092, Test Acc: 0.7067
Epoch: 007, Train Acc: 0.7158, Test Acc: 0.7300
Epoch: 008, Train Acc: 0.7192, Test Acc: 0.7333
Epoch: 009, Train Acc: 0.7092, Test Acc: 0.7067
