In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from torch_geometric.data import Data
from tqdm import tqdm
import networkx as nx
from torch_geometric.utils import to_networkx
from sklearn.metrics import r2_score, mean_squared_error
from torch_geometric.loader import DataLoader
from sklearn.model_selection import train_test_split
from torch_geometric.nn import GCNConv, MessagePassing  # Import GCNConv
from torch_geometric.utils import add_self_loops

# 2. Model Definition
class GatedFusion(nn.Module):
    def __init__(self, cnn_dim):
        super().__init__()
        self.gate = nn.Sequential(
            nn.Linear(1, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 800), 
#             nn.Sigmoid()
            nn.Tanh()
        )
        self.mfrac_boost = nn.Parameter(torch.tensor(3.0))
        self.spatial_groups = 25
        self.channels_per_group = cnn_dim // self.spatial_groups

    def forward(self, cnn_feat, mfrac):
        # Generate gate weights from mfrac
        gate_weights = self.gate(mfrac)
        # Group weights for spatial attention
        grouped_weights = gate_weights.view(-1, self.channels_per_group, self.spatial_groups)
        spatial_weights = grouped_weights.mean(dim=1)
        expanded_weights = spatial_weights.unsqueeze(1).repeat(1, self.channels_per_group, 1)
        gate_weights = expanded_weights.reshape_as(gate_weights)
        # Amplify gate weights
        gate_weights = gate_weights * 2.0
        # Boost mfrac value
        boosted_mfrac = mfrac * self.mfrac_boost
        # Concatenate gated features with boosted mfrac
        return torch.cat([cnn_feat * gate_weights, boosted_mfrac], dim=1), gate_weights

class Optimized_CNN_GCN(nn.Module):
    def __init__(self, input_dim=(21, 21), filters=32, kernel_size=3, 
                 dense_units=64, dropout_rate=0, gcn_hidden_dim=64, 
                 learning_rate=0.001):
        super(Optimized_CNN_GCN, self).__init__()
        # CNN layers for feature extraction
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        # Calculate CNN output dimension dynamically
        with torch.no_grad():
            dummy_input = torch.randn(1, 1, *input_dim)
            dummy_output = self.cnn(dummy_input)
            self.cnn_output_dim = dummy_output.view(-1).shape[0]
        
        # Gated fusion module
        self.fusion = GatedFusion(self.cnn_output_dim)
        # GCN layers
        self.conv1 = GCNConv(self.cnn_output_dim + 1, gcn_hidden_dim)
        self.conv2 = GCNConv(gcn_hidden_dim, gcn_hidden_dim)
        self.dropout = nn.Dropout(dropout_rate)
        # Output layer
        self.linear = nn.Linear(gcn_hidden_dim, 1)
#         self.sigmoid = nn.Sigmoid() # Add Sigmoid activation
        self.learning_rate = learning_rate
        
        # Add attributes for visualization
        self.gate_weights = None
        self.attn_weights = None
        self.conv1_attention = torch.randn(4, 1)  # Random data for visualization
        self.conv2_attention = torch.randn(4, 1)  # Random data for visualization
        
    def forward(self, data):
        x_all = data.x
        num_nodes = data.num_nodes
        
        # Split input into CNN features and mfrac
        x_cnn = x_all[:, :-1].view(num_nodes, 1, 21, 21)
        x_mfrac = x_all[:, -1].unsqueeze(1)
        
        # Process with CNN
        x_cnn = self.cnn(x_cnn)
        x_cnn = x_cnn.view(num_nodes, -1)
        
        # Apply gated fusion
        x, gate_weights = self.fusion(x_cnn, x_mfrac)
        self.gate_weights = gate_weights
        self.attn_weights = torch.randn(num_nodes, 1)  # Random data for visualization
        
        # Apply GCN layers
        edge_index = data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = self.dropout(x)
        x = F.relu(self.conv2(x, edge_index))
        
        # Output layer
        x = self.linear(x)
#         x = self.sigmoid(x)  # Apply Sigmoid
        return x
    
# Instant model test
sample_data = graphs[0]  # Test with first sample
model = Optimized_CNN_GCN()
output = model(sample_data)
print("\nModel test output:")
print(output.shape)
print(output)

# Check model structure
print("\nModel structure:")
print(model)

In [None]:
from torch_geometric.data import DataLoader
from sklearn.metrics import r2_score
from copy import deepcopy
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn

# Set random seed for reproducibility
torch.manual_seed(45)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(45)
np.random.seed(45)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Prepare data loaders
np.random.shuffle(graphs)
train_dataset = graphs[:int(0.7*len(graphs))]
test_dataset = graphs[int(0.7*len(graphs)):]

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Initialize model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Optimized_CNN_GCN(
    filters=32,
    kernel_size=5,
    dense_units=128,
    dropout_rate=0.2,
    gcn_hidden_dim=32,
    learning_rate=0.0015
).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.0015)
criterion = nn.MSELoss()

# Training parameters
epochs = 200
patience = 30
min_delta = 0.00001

# Training records
train_losses = []
test_losses = []
train_r2_scores = []
test_r2_scores = []
best_test_loss = float('inf')
best_model_state = None
early_stop_counter = 0

# Training loop
for epoch in range(epochs):
    # Training phase
    model.train()
    train_loss = 0
    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * data.num_graphs
    train_loss /= len(train_loader.dataset)
    train_losses.append(train_loss)
    
    # Testing phase
    model.eval()
    test_loss = 0
    test_preds = []
    test_targets = []
    with torch.no_grad():
        for data in test_loader:
            data = data.to(device)
            out = model(data)
            loss = criterion(out, data.y)
            test_loss += loss.item() * data.num_graphs
            test_preds.append(out.cpu().numpy())
            test_targets.append(data.y.cpu().numpy())
    test_loss /= len(test_loader.dataset)
    test_losses.append(test_loss)
    
    # Calculate R² score
    test_preds = np.concatenate(test_preds, axis=0)
    test_targets = np.concatenate(test_targets, axis=0)
    test_r2 = r2_score(test_targets, test_preds)
    test_r2_scores.append(test_r2)
    
    # Training set evaluation
    model.eval()
    train_preds = []
    train_targets = []
    with torch.no_grad():
        for data in train_loader:
            data = data.to(device)
            out = model(data)
            train_preds.append(out.cpu().numpy())
            train_targets.append(data.y.cpu().numpy())
    train_preds = np.concatenate(train_preds, axis=0)
    train_targets = np.concatenate(train_targets, axis=0)
    train_r2 = r2_score(train_targets, train_preds)
    train_r2_scores.append(train_r2)
    
    # Early stopping check
    if test_loss < best_test_loss - min_delta:
        best_test_loss = test_loss
        best_model_state = deepcopy(model.state_dict())
        early_stop_counter = 0
    else:
        early_stop_counter += 1
    
    # Print progress
    if (epoch + 1) % 5 == 0:
        print(f'Epoch {epoch + 1}/{epochs}: '
              f'Train Loss: {train_loss:.5f}, Test Loss: {test_loss:.5f}, '
              f'Train R2: {train_r2:.5f}, Test R2: {test_r2:.5f}')
    
    # Save model when early stopping is triggered
    if early_stop_counter >= patience:
        print(f'Early stopping at epoch {epoch + 1}')
        torch.save(best_model_state, 'best_model.pth')
        print("Best model saved as best_model.pth")
        break

# Save model when training completes normally
if early_stop_counter < patience:
    torch.save(best_model_state, 'best_model.pth')
    print("Training completed, best model saved as best_model.pth")

# Load best model for testing evaluation
model.load_state_dict(best_model_state)
model.eval()

# Test set evaluation
test_loss = 0
test_preds = []
test_targets = []
with torch.no_grad():
    for data in test_loader:
        data = data.to(device)
        out = model(data)
        loss = criterion(out, data.y)
        test_loss += loss.item() * data.num_graphs
        test_preds.append(out.cpu().numpy())
        test_targets.append(data.y.cpu().numpy())

test_loss /= len(test_loader.dataset)
test_preds = np.concatenate(test_preds, axis=0)
test_targets = np.concatenate(test_targets, axis=0)
test_r2 = r2_score(test_targets, test_preds)

# Calculate training set metrics
train_preds = []
train_targets = []
with torch.no_grad():
    for data in train_loader:
        data = data.to(device)
        out = model(data)
        train_preds.append(out.cpu().numpy())
        train_targets.append(data.y.cpu().numpy())
train_preds = np.concatenate(train_preds, axis=0)
train_targets = np.concatenate(train_targets, axis=0)

# Define function to calculate metrics
def calculate_metrics(targets, preds):
    mse = np.mean((targets - preds) ** 2)
    rmse = np.sqrt(mse)
    mae = np.mean(np.abs(targets - preds))
    r2 = r2_score(targets, preds)
    return rmse, mae, r2

# Calculate metrics for each set
train_rmse, train_mae, train_r2 = calculate_metrics(train_targets, train_preds)
test_rmse, test_mae, test_r2 = calculate_metrics(test_targets, test_preds)

# Print results
print('\n=== Final Results ===')
print('Training Set - RMSE: {:.4f}, MAE: {:.4f}, R²: {:.4f}'.format(train_rmse, train_mae, train_r2))
print('Test Set - RMSE: {:.4f}, MAE: {:.4f}, R²: {:.4f}'.format(test_rmse, test_mae, test_r2))