# **Graph Learning Project - MNIST exp 1**

By Shahar Cohen 205669260 & Alex petrunin 205782568

# Installation

In [11]:
!pip install -q torch-geometric

# SETUP

In [12]:
import torch
import torch.nn as nn
from torch_geometric.nn import GPSConv, GatedGraphConv, TransformerConv, GINEConv
from torch_geometric.data import Data
import torch.nn.functional as F
from torch_geometric.nn import global_mean_pool, global_add_pool

from torch_geometric.transforms import AddLaplacianEigenvectorPE
import torch_geometric

import torch.optim as optim

from torch_geometric.datasets import ZINC
from torch_geometric.loader import DataLoader

import torch.optim as optim
from torch_geometric.data import DataLoader
from sklearn.metrics import mean_squared_error

from torch_geometric.datasets import MNISTSuperpixels

# MODEL

In [13]:
class MLPBlock(nn.Module):
    def __init__(self, in_channels, hidden_channels):
        super(MLPBlock, self).__init__()
        self.fc1 = nn.Linear(in_channels, hidden_channels)
        self.fc2 = nn.Linear(hidden_channels, hidden_channels)  # This should output 'hidden_channels'

    def forward(self, x):
        x = x.float()  # Ensure the input is float before passing it to the linear layer
        x = F.relu(self.fc1(x))  # Apply ReLU activation after the first linear layer
        x = self.fc2(x)  # The second layer keeps the number of features as hidden_channels
        return x


class GraphGPSModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, pe_dim):
        super(GraphGPSModel, self).__init__()

        # MLP layers for each GPSConv layer
        self.mlp1 = MLPBlock(input_dim + pe_dim, hidden_dim)

        # GPSConv layers
        self.gps1 = GPSConv(hidden_dim, conv=GatedGraphConv(hidden_dim, 2), heads=4, attn_kwargs={'dropout':0.5})
        self.gps2 = GPSConv(hidden_dim, conv=GatedGraphConv(hidden_dim, 2), heads=4, attn_kwargs={'dropout':0.5})
        self.gps3 = GPSConv(hidden_dim, conv=GatedGraphConv(hidden_dim, 2), heads=4, attn_kwargs={'dropout':0.5})

        # Final classifier layer
        self.fc = nn.Linear(hidden_dim, output_dim)

        # PE ####
        # MLP for PE
        self.mlp_pe = MLPBlock(pe_dim, pe_dim)
        #self.fc_pe = nn.Linear(pe_dim, pe_dim)

        #######

        self.softmax = nn.Softmax(dim=1)

    def forward(self, data):
        x, edge_index, batch, pe, edge_attr = data.x, data.edge_index, data.batch, data.laplacian_eigenvector_pe, data.edge_attr

        ##### this needs to change
        # PE linear layer
        pe = self.mlp_pe(pe)
        #pe = self.fc_pe(pe)
        #####

        # Concatenate PE to x (along the feature dimension, axis=1)
        x = torch.cat([x, pe], dim=1)

        # Pass through MLP blocks before GPSConv layers
        x = self.mlp1(x)

        # Pass through gps layers
        x = self.gps1(x, edge_index, batch=batch)
        x = self.gps2(x, edge_index, batch=batch)
        x = self.gps3(x, edge_index, batch=batch)

        # Global pooling
        x = global_mean_pool(x, batch)

        # Final classification layer
        x = self.fc(x)
        return self.softmax(x)



#Load MNIST and add PE

In [14]:
# PE
transform_le = AddLaplacianEigenvectorPE(k=8)

# Load the MNISTSuperpixels dataset
root_dir = './data/MNISTSuperpixels'
train_dataset = MNISTSuperpixels(root=root_dir, train=True, transform=transform_le)
test_dataset = MNISTSuperpixels(root=root_dir, train=False, transform=transform_le)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


In [15]:

# Example of a data sample from the training set
data = train_dataset[0]
print(f'Training Node feature shape: {data.x.shape}, Edge index shape: {data.edge_index.shape}')

# Example of a data sample from the testing set
test_data = test_dataset[0]
print(f'Testing Node feature shape: {test_data.x.shape}, Edge index shape: {test_data.edge_index.shape}')


Training Node feature shape: torch.Size([75, 1]), Edge index shape: torch.Size([2, 1399])
Testing Node feature shape: torch.Size([75, 1]), Edge index shape: torch.Size([2, 1405])


In [16]:
data

Data(x=[75, 1], edge_index=[2, 1399], y=[1], pos=[75, 2], laplacian_eigenvector_pe=[75, 8])

#Training

In [17]:
# Training loop
def train():
    model.train()
    total_loss = 0
    for data in train_loader:
        optimizer.zero_grad()

        # Move data to the same device as the model
        data = data.to(device)

        # Forward pass
        output = model(data)

        # Get the target values
        y = data.y.view(-1, 1).to(device)
        y = y.squeeze()
        num_classes = output.size(1)
        y_one_hot = torch.zeros(y.size(0), num_classes, device='cuda')
        y_one_hot.scatter_(1, y.unsqueeze(1), 1)

        # Compute the loss
        loss = criterion(output, y_one_hot)
        loss.backward()

        # Optimization step
        optimizer.step()

        total_loss += loss.item()


    return total_loss / len(train_loader)


# Define a function to evaluate the model on a given dataset
def evaluate(loader):
    model.eval()
    total_loss = 0
    with torch.no_grad():  # Disable gradient computation for evaluation
        for data in loader:
            data = data.to(device)
            output = model(data)
            y = data.y.view(-1, 1).to(device)
            y = y.squeeze()
            num_classes = output.size(1)
            y_one_hot = torch.zeros(y.size(0), num_classes, device='cuda')
            y_one_hot.scatter_(1, y.unsqueeze(1), 1)
            loss = criterion(output, y_one_hot)
            total_loss += loss.item()
    return total_loss / len(loader)


In [18]:
# num_layers = 3
input_dim = train_dataset.num_features
hidden_dim = 52
output_dim = 10
pe_dim = 8

weight_decay = 1e-5
lr = 0.001
epochs_num = 10

In [19]:
# Define the model

model = GraphGPSModel(input_dim=input_dim, hidden_dim=hidden_dim,  output_dim=output_dim, pe_dim=pe_dim)
optimizer = optim.Adam(model.parameters(), lr=lr)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Loss function
criterion = nn.CrossEntropyLoss()

# Initialize variables to track the best model
best_val_loss = float('inf')
best_model = None

# Training the model for epochs_num:
for epoch in range(epochs_num):
    # Train the model for one epoch
    train_loss = train()

    # Evaluate the model on the validation set
    val_loss = evaluate(test_loader)

    print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}')

    # Check if this is the best validation loss we've seen
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        # Save a copy of the best model
        best_model = model.state_dict()  # No need for deepcopy
        print(f'New best model saved at epoch {epoch+1} with Validation Loss: {val_loss:.4f}')

# After training, you can save the best model to disk
torch.save(best_model, 'best_model_MNIST.pth')
print("Best model saved to 'best_model_MNIST.pth'.")

Epoch 1, Train Loss: 1.9715, Validation Loss: 2.1579
New best model saved at epoch 1 with Validation Loss: 2.1579
Epoch 2, Train Loss: 1.8282, Validation Loss: 1.8142
New best model saved at epoch 2 with Validation Loss: 1.8142
Epoch 3, Train Loss: 1.7943, Validation Loss: 1.7784
New best model saved at epoch 3 with Validation Loss: 1.7784
Epoch 4, Train Loss: 1.7893, Validation Loss: 1.7429
New best model saved at epoch 4 with Validation Loss: 1.7429
Epoch 5, Train Loss: 1.7807, Validation Loss: 1.8884
Epoch 6, Train Loss: 1.7823, Validation Loss: 1.7612
Epoch 7, Train Loss: 1.7874, Validation Loss: 1.8358
Epoch 8, Train Loss: 1.7870, Validation Loss: 1.7877
Epoch 9, Train Loss: 1.8075, Validation Loss: 1.7343
New best model saved at epoch 9 with Validation Loss: 1.7343
Epoch 10, Train Loss: 1.7879, Validation Loss: 1.9760
Best model saved to 'best_model_MNIST.pth'.


In [20]:
def compute_accuracy(loader, model):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for data in loader:
            data = data.to(device)

            # Forward pass
            output = model(data)

            # Get the predicted class (index with max probability)
            _, predicted = torch.max(output, 1)

            # Get the true labels
            y = data.y.view(-1).to(device)

            # Update correct predictions and total samples
            correct += (predicted == y).sum().item()
            total += y.size(0)

    # Compute accuracy
    accuracy = 100 * correct / total
    return accuracy

# Load the best model's state dictionary
model.load_state_dict(best_model)
model.to(device)  # Ensure the model is on the correct device (GPU or CPU)


# Compute and print the test accuracy after training
test_accuracy = compute_accuracy(test_loader, model)
print(f'Final Test Accuracy: {test_accuracy:.2f}%')

Final Test Accuracy: 48.23%
