In [None]:
import sys

if "google.colab" in sys.modules:
    # Mount Google Drive
    from google.colab import drive
    drive.mount('/content/drive')
    dataset_path = '/content/drive/My Drive/dataset'

    # Install required packages
    !pip install torch_geometric
    import torch
    from torch_geometric.data import Data, DataLoader

else:
    dataset_path = 'dataset'

## The data

In [None]:
# Load the data
# Training set
train_graphs = torch.load(f"{dataset_path}/combined/training.pt", weights_only=False)

# Validating set
val_graphs = torch.load(f"{dataset_path}/combined/validating.pt", weights_only=False)

# Testing set
test_graphs = torch.load(f"{dataset_path}/combined/testing.pt", weights_only=False)

In [None]:
# Create data loaders
from torch_geometric.loader import DataLoader

train_loader = DataLoader(train_graphs, batch_size=32, shuffle=True)
val_loader = DataLoader(val_graphs, batch_size=32, shuffle=False)
test_loader = DataLoader(test_graphs, batch_size=32, shuffle=False)

## Build model

### import dependancies

In [None]:
# import dependancies
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool, NNConv

### model class

In [None]:
# model 1
class GraphLevelGNN(nn.Module):
    def __init__(self, node_dim, global_dim, hidden_dim):
        super().__init__()
        self.conv1 = GCNConv(node_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.fc1 = nn.Linear(hidden_dim + global_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)  # For regression

    def forward(self, x, edge_index, batch, global_attr):
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = global_mean_pool(x, batch)  # Aggregate node info to graph-level
        x = torch.cat([x, global_attr], dim=1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

In [None]:
# Model 2: Model considering edge features
class GNNModel(nn.Module):
    def __init__(self, node_dim, edge_dim, global_dim, hidden_dim):
        super().__init__()
        # Edge NN maps edge_attr to weight matrix
        self.edge_nn = nn.Sequential(
            nn.Linear(edge_dim, 32),
            nn.ReLU(),
            nn.Linear(32, node_dim * hidden_dim) 
        )

        self.conv1 = NNConv(node_dim, hidden_dim, self.edge_nn, aggr='mean')
        self.conv2 = NNConv(hidden_dim, hidden_dim, self.edge_nn, aggr='mean')

        self.fc1 = nn.Linear(hidden_dim + global_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)  # Regression output

    def forward(self, x, edge_index, edge_attr, batch, global_attr):
        x = F.relu(self.conv1(x, edge_index, edge_attr))
        x = F.relu(self.conv2(x, edge_index, edge_attr))
        x = global_mean_pool(x, batch)
        x = torch.cat([x, global_attr], dim=1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

In [None]:
class GNNGraphLevelModel(torch.nn.Module):
    def __init__(self, node_dim, global_dim, hidden_dim):
        super().__init__()
        self.conv1 = GCNConv(node_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.fc1 = nn.Linear(hidden_dim + global_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)  # For regression

    def forward(self, x, edge_index, batch, global_attr):
        x = self.conv1(x, edge_index).relu()
        x = self.conv2(x, edge_index).relu()
        x = global_mean_pool(x, batch)  # shape [batch_size, hidden_dim]

        x = torch.cat([x, global_attr], dim=1)  # combine with global features
        x = self.fc1(x).relu()
        return self.fc2(x)

### model instance, optimizer, loss function

In [None]:
# Instantiate model, optimizer, loss

# run the model in the gpu if the device has one
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 

# Instance of the model
model = GNNModel(node_dim=8, edge_dim=3, global_dim=11, hidden_dim=32).to(device)
# model = GNNWithEdgeFeatures(node_dim=3, edge_dim=1, global_dim=2, hidden_dim=64).to(device)


# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# Loss function
loss_fn = nn.MSELoss()

### Train and validation loops

In [None]:
def train():
    model.train()
    total_loss = 0
    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()
        # out = model(data.x, data.edge_index, data.batch, data.u)
        out = model(data.x, data.edge_index, data.edge_attr, data.batch, data.u) # when added edg features
        loss = loss_fn(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * data.num_graphs
    return total_loss / len(train_loader.dataset)

In [None]:
def evaluate(loader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            out = model(data.x, data.edge_index, data.batch, data.u)
            # out = model(data.x, data.edge_index, data.edge_attr, data.batch, data.u) # when there are edge fetures
            loss = loss_fn(out, data.y)
            total_loss += loss.item() * data.num_graphs
    return total_loss / len(loader.dataset)

In [None]:
### Training, validating, and testing

In [None]:
# Run training and testing
for epoch in range(1, 31):
    train_loss = train()
    val_loss = evaluate(val_loader)
    print(f'Epoch {epoch:03d}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

test_loss = evaluate(test_loader)
print(f'Test Loss: {test_loss:.4f}')