# Load and Prepare Data
Load training, validation, and test data from the data folder. Convert graph structures into suitable matrix representations for training.

In [2]:
import pickle
import numpy as np
from sklearn.preprocessing import OneHotEncoder

# Load data
with open('data/train_graphs.dat', 'rb') as file:
    train_graphs = pickle.load(file)
with open('data/val_graphs.dat', 'rb') as file:
    val_graphs = pickle.load(file)
with open('data/test_graphs.dat', 'rb') as file:
    test_graphs = pickle.load(file)

# Combine all graphs for dimension analysis
all_graphs = []
all_graphs.extend(train_graphs)
all_graphs.extend(val_graphs)
all_graphs.extend(test_graphs)

def get_max_dimensions(graphs):
    max_parts = max(len(graph.get_parts()) for graph in graphs)
    max_part_id = max(int(part.get_part_id()) for graph in graphs for part in graph.get_parts())
    max_family_id = max(int(part.get_family_id()) for graph in graphs for part in graph.get_parts())
    return max_parts, max_part_id, max_family_id

max_parts, max_part_id, max_family_id = get_max_dimensions(all_graphs)

# Initialize encoders with explicit categories
part_categories = np.arange(max_part_id + 1)
family_categories = np.arange(max_family_id + 1)

part_encoder = OneHotEncoder(categories=[part_categories], sparse_output=False, handle_unknown='ignore')
family_encoder = OneHotEncoder(categories=[family_categories], sparse_output=False, handle_unknown='ignore')

# Fit encoders
part_encoder.fit(part_categories.reshape(-1, 1))
family_encoder.fit(family_categories.reshape(-1, 1))

def process_dataset(graphs):
    features_list = []
    adj_matrices = []
    
    for graph in graphs:
        parts = list(graph.get_parts())
        n_parts = len(parts)
        
        # Convert IDs to int explicitly
        part_ids = np.array([int(part.get_part_id()) for part in parts]).reshape(-1, 1)
        family_ids = np.array([int(part.get_family_id()) for part in parts]).reshape(-1, 1)
        
        part_encoded = part_encoder.transform(part_ids)
        family_encoded = family_encoder.transform(family_ids)
        
        features = np.hstack([part_encoded, family_encoded])
        
        if n_parts < max_parts:
            padding = np.zeros((max_parts - n_parts, features.shape[1]))
            features = np.vstack([features, padding])
            
        adj_matrix = graph.get_adjacency_matrix(tuple(parts))
        if n_parts < max_parts:
            adj_matrix = np.pad(adj_matrix, ((0, max_parts - n_parts), (0, max_parts - n_parts)))
            
        features_list.append(features)
        adj_matrices.append(adj_matrix)
    
    return np.array(features_list), np.array(adj_matrices)

# Process datasets
X_train, y_train = process_dataset(train_graphs)
X_val, y_val = process_dataset(val_graphs)
X_test, y_test = process_dataset(test_graphs)

print(f"Training shapes: X={X_train.shape}, y={y_train.shape}")
print(f"Validation shapes: X={X_val.shape}, y={y_val.shape}")
print(f"Test shapes: X={X_test.shape}, y={y_test.shape}")

Training shapes: X=(6695, 21, 2367), y=(6695, 21, 21)
Validation shapes: X=(2231, 21, 2367), y=(2231, 21, 21)
Test shapes: X=(2233, 21, 2367), y=(2233, 21, 21)


# Build Neural Network Model
Create a feed-forward neural network using PyTorch with appropriate input size (based on one-hot encoding), hidden layers, and output size (adjacency matrix prediction).

In [4]:
# Prepare data dimensions
batch_size = 32
num_features = X_train.shape[1]  # Features per part
num_parts = max_parts  # Maximum number of parts
adj_matrix_size = num_parts * num_parts  # Size of flattened adjacency matrix

# Model definition
class FFN(nn.Module):
    def __init__(self, num_features, num_parts, hidden_size=512):
        super(FFN, self).__init__()
        self.num_parts = num_parts
        self.flatten = nn.Flatten(1)  # Flatten everything except batch dimension
        
        input_size = num_features * num_parts
        self.layers = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, num_parts * num_parts),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.flatten(x)
        return self.layers(x)

# Convert data to tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train.reshape(y_train.shape[0], -1))

# Create data loader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Initialize model
model = FFN(num_features, num_parts)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

NameError: name 'torch' is not defined

# Train the Model
Implement training loop with batch processing, loss calculation, and optimization. Include validation step to monitor model performance.

In [None]:
# Train the Model

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Define the neural network model
class GraphPredictionModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(GraphPredictionModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.relu(out)
        out = self.fc3(out)
        out = self.sigmoid(out)
        return out

# Hyperparameters
input_size = X_train.shape[2]  # Number of features after one-hot encoding
hidden_size = 128
output_size = y_train.shape[1] * y_train.shape[2]  # Flattened adjacency matrix
num_epochs = 50
batch_size = 32
learning_rate = 0.001

# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.reshape(y_train.shape[0], -1), dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val.reshape(y_val.shape[0], -1), dtype=torch.float32)

# Create data loaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Initialize model, loss function, and optimizer
model = GraphPredictionModel(input_size, hidden_size, output_size)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    for X_batch, y_batch in train_loader:
        # Forward pass
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # Validation
    model.eval()
    with torch.no_grad():
        val_loss = 0
        for X_batch, y_batch in val_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss/len(val_loader):.4f}')

# Model Evaluation
Evaluate model performance using the provided edge_accuracy metric and additional relevant metrics.

In [None]:
# Model Evaluation

import torch

# Convert test data to PyTorch tensors
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.reshape(y_test.shape[0], -1), dtype=torch.float32)

# Create test data loader
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Evaluation function
def evaluate_model(model, data_loader):
    model.eval()
    total_loss = 0
    correct_edges = 0
    total_edges = 0
    criterion = nn.BCELoss()
    
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            total_loss += loss.item()
            
            # Calculate edge accuracy
            predicted_adj_matrix = outputs.round().reshape(y_batch.shape[0], y_train.shape[1], y_train.shape[2])
            target_adj_matrix = y_batch.reshape(y_batch.shape[0], y_train.shape[1], y_train.shape[2])
            correct_edges += (predicted_adj_matrix == target_adj_matrix).sum().item()
            total_edges += y_batch.numel()
    
    avg_loss = total_loss / len(data_loader)
    edge_accuracy = correct_edges / total_edges * 100
    return avg_loss, edge_accuracy

# Evaluate on test data
test_loss, test_edge_accuracy = evaluate_model(model, test_loader)
print(f'Test Loss: {test_loss:.4f}, Test Edge Accuracy: {test_edge_accuracy:.2f}%')

# Prediction and Graph Construction
Implement methods to convert model predictions back into Graph objects using the provided Graph class.

In [None]:
# Function to convert model predictions back into Graph objects
def predictions_to_graphs(predictions, parts_list):
    graphs = []
    for i, adj_matrix in enumerate(predictions):
        parts = parts_list[i]
        graph = Graph()
        for part in parts:
            graph.__add_node(Node(part.get_part_id(), part))
        for j, part1 in enumerate(parts):
            for k, part2 in enumerate(parts):
                if adj_matrix[j, k] == 1:
                    graph.add_undirected_edge(part1, part2)
        graphs.append(graph)
    return graphs

# Example usage
predicted_adj_matrices = model(X_test_tensor).round().detach().numpy().reshape(X_test.shape[0], y_test.shape[1], y_test.shape[2])
predicted_graphs = predictions_to_graphs(predicted_adj_matrices, [list(graph.get_parts()) for graph in test_graphs])

# Display the first predicted graph
predicted_graphs[0].draw()

# Save Model
Save the trained model following the MyPredictionModel interface for later use in evaluation.

In [None]:
import pickle

# Save the trained model
model_file_path = 'trained_graph_prediction_model.pkl'
with open(model_file_path, 'wb') as file:
    pickle.dump(model, file)

# Save the model class definition
class MyTrainedModel(MyPredictionModel):
    def __init__(self, model):
        self.model = model

    def predict_graph(self, parts: Set[Part]) -> Graph:
        parts_encoded, _ = graph_to_matrix(Graph(parts))
        parts_tensor = torch.tensor(parts_encoded, dtype=torch.float32).unsqueeze(0)
        adj_matrix_pred = self.model(parts_tensor).round().detach().numpy().reshape(parts_encoded.shape[0], parts_encoded.shape[0])
        return predictions_to_graphs([adj_matrix_pred], [list(parts)])[0]

# Save the model following the MyPredictionModel interface
trained_model = MyTrainedModel(model)
with open('my_trained_model.pkl', 'wb') as file:
    pickle.dump(trained_model, file)