In [None]:
%%time
import src
from pathlib import Path
from itertools import product
from functools import partial

from tqdm.notebook import trange, tqdm

from src import Petri_Cheb_GNN, Petri_GCN

batch_size = 64
train_dataset = src.get_reachability_dataset(Path('Data/RandData_DS1_train_data.processed'), batch_size=batch_size)
test_dataset = src.get_reachability_dataset(Path('Data/RandData_DS1_test_data.processed'), batch_size=batch_size)

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, MLP
from torch_geometric.utils import scatter

class GCN(torch.nn.Module):
    def __init__(self, num_node_features, num_hidden_features):
        super().__init__()
        self.conv1 = GCNConv(num_node_features, num_hidden_features)
        self.conv2 = GCNConv(num_hidden_features, num_hidden_features)
        self.readout = MLP([num_hidden_features, num_hidden_features*2, num_hidden_features*3, 1])

    def forward(self, data):
        x, edge_index = data.x, data.edge_index

        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)
        x = self.readout(x)
        return scatter(x, data.batch, dim=0, reduce='mean').view(-1)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def train(model, optimizer):
    model.train()
    for epoch in tqdm(range(100), leave=False):
        for graph in train_dataset:
            optimizer.zero_grad()
            out = torch.flatten(model(graph))
            loss = F.l1_loss(out, graph.y)
            loss.backward()
            optimizer.step()

In [None]:
def test(model):
    model.eval()
    pred = []
    actual = torch.tensor([graph.y for graph in test_dataset.dataset])
    for graph in test_dataset:
        pred.extend(model(graph).tolist())
    
    pred = torch.tensor(pred)
    pred = torch.flatten(pred)

    mre = torch.mean(F.l1_loss(pred, actual, reduction='none') / actual) * 100
    return F.l1_loss(pred, actual), torch.nn.MSELoss()(pred, actual), mre

In [None]:
def run_scenario(create_model, scenario, compile=False):
    values = []
    for i in trange(30, leave=False, desc=scenario):
        model = create_model()
        if compile:
            model = torch.compile(model, dynamic=True)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
        train(model, optimizer)
        values.append(test(model))
    values = torch.tensor(values)
    return values

In [None]:
def variance(sample):
    return torch.mean((sample - torch.mean(sample))**2)

def std_deviation(sample):
    return variance(sample)**2

In [None]:
GNN_layers = range(2, 11)
MLP_layers = range(2, 6)
pairs = list(product(GNN_layers, MLP_layers))
GNN_Operators = [("GCN", Petri_GCN), ("Chebyshev", Petri_Cheb_GNN)]
results = []
for (name, operator), layers in tqdm(list(product(GNN_Operators, pairs))):
    scenario = f'{name}, {layers[0]} GNN layers, {layers[1]} readout layers'
    f = lambda: operator(train_dataset.num_features, 16, num_layers=layers[0], readout_layers=layers[1])
    scenario_result = run_scenario(f, scenario)
    results.append((scenario, scenario_result))