In [None]:
import numpy as np
import torch
import random
import os

seed_value = 3407   # 设定随机数种子

np.random.seed(seed_value)
random.seed(seed_value)
os.environ['PYTHONHASHSEED'] = str(seed_value)  

torch.manual_seed(seed_value)     
torch.cuda.manual_seed(seed_value)      
# torch.cuda.manual_seed_all(seed_value)   

torch.backends.cudnn.deterministic = True

In [None]:
from torch_geometric.data import InMemoryDataset, Data

class GraphDataset(InMemoryDataset):
    def __init__(self, data_list, root, transform=None, pre_transform=None):
        self.data_list = data_list
        super(GraphDataset, self).__init__(root, transform, pre_transform)
        self.data, self.slices = self.collate(data_list)

    @property
    def raw_file_names(self):
        return []

    @property
    def processed_file_names(self):
        return ['data.pt']

    def download(self):
        pass

    def process(self):
        pass


    def __repr__(self):
        return '{}({})'.format(self.__class__.__name__, len(self.data_list))

data_list = torch.load('../Data/data_list.pt')    
dataset_root = './'  
dataset = GraphDataset(data_list, root=dataset_root)
print(dataset)
print(dataset[0])
print(dataset[0].x)
print(dataset[0].edge_index)
print(dataset[0].y)
print(dataset[0].name)

positive_samples = sum(1 for sample in dataset if sample['y'] == 1)
negative_samples = sum(1 for sample in dataset if sample['y'] == 0)

print(f"positive_samples_num: {positive_samples}")
print(f"negative_samples_num: {negative_samples}")

In [None]:
import torch
from torch_geometric.loader import DataLoader

total_size = len(dataset)

train_size = int(total_size * 0.6)
test_size = int(total_size * 0.2)

val_size = total_size - train_size - test_size


train_dataset, test_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, test_size, val_size])

print(f'Number of training samples: {len(train_dataset)}')
print(f'Number of test samples: {len(test_dataset)}')
print(f'Number of validation samples: {len(val_dataset)}')

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)  

In [6]:
import torch
import torch.nn as nn
from torch_geometric.nn import global_mean_pool, BatchNorm, global_max_pool

class GlobalMeanPool(nn.Module):
    def __init__(self):
        super(GlobalMeanPool, self).__init__()

    def forward(self, x, batch):
        return global_mean_pool(x, batch)

class GraphSELayer(nn.Module):
    def __init__(self, node_channels, reduction=16):
        super(GraphSELayer, self).__init__()
        self.node_channels = node_channels
        self.reduction = reduction
        self.global_pool = GlobalMeanPool()
        self.fc = nn.Sequential(
            nn.Linear(node_channels, node_channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(node_channels // reduction, node_channels, bias=False),
            nn.Sigmoid()
        )
        
        

    def forward(self, x, batch):
        mean_pool = self.global_pool(x, batch)
        scale = self.fc(mean_pool)
        scale = torch.repeat_interleave(scale, batch.bincount(), dim=0)
        return x * scale.view(-1, self.node_channels)
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GraphConv, Linear, global_mean_pool, GCNConv, TopKPooling, global_max_pool

class SecProGNN(torch.nn.Module):
    def __init__(self, input_features, hidden_channels, num_classes):
        super(SecProGNN, self).__init__()
        self.conv1 = GraphConv(input_features, hidden_channels)
        self.se1 = GraphSELayer(hidden_channels)  # SELayer after GraphConv
        
        self.conv2 = GraphConv(hidden_channels, hidden_channels * 2)
        self.se2 = GraphSELayer(hidden_channels * 2)  # SELayer after GraphConv
        
        self.conv3 = GraphConv(hidden_channels * 2, hidden_channels * 4)
        self.se3 = GraphSELayer(hidden_channels * 4)  # SELayer after GraphConv  

        
        self.classifier = nn.Sequential(
            nn.Linear(hidden_channels * 4, 1024),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(1024, 1024),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(1024, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )

    def forward(self, x, edge_index, batch):
        # Convolutional layer 1
        x = self.conv1(x, edge_index)  
        x = F.relu(x)
        x = self.se1(x, batch)

        # Convolutional layer 2
        x = self.conv2(x, edge_index)  
        x = F.relu(x)
        x = self.se2(x, batch)

        # Convolutional layer 3
        x = self.conv3(x, edge_index)  
        x = F.relu(x)
        x = self.se3(x, batch)

        # Global mean pooling
        x = global_mean_pool(x, batch)


        # Classifier
        x = self.classifier(x)

        return x

In [7]:
from sklearn.metrics import accuracy_score, matthews_corrcoef, roc_auc_score, precision_score, recall_score, f1_score
import torch
import numpy as np
def train():
    model.train()
    
    for data in train_loader:
        data = data.to(device)  
        optimizer.zero_grad()
        
        out = model(data.x, data.edge_index, data.batch)
        
        loss = criterion(out, data.y)

        loss.backward()
        optimizer.step()
        
def evaluate(loader):
    model.eval()  
    y_true = []
    y_pred = []
    y_score = []  

    with torch.no_grad():
        for data in loader:
            data = data.to(device)  
            out = model(data.x, data.edge_index, data.batch)
           
            pred = out.argmax(dim=1).cpu().numpy()  

            y_true.extend(data.y.cpu().numpy())  

            y_pred.extend(pred)  

            y_score.extend(out[:, 1].cpu().numpy())  
    
    accuracy = accuracy_score(y_true, y_pred)
    mcc = matthews_corrcoef(y_true, y_pred)
    auc = roc_auc_score(y_true, y_score)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)

    metrics = {
        'Accuracy': accuracy,
        'MCC': mcc,
        'AUC': auc,
        'Precision': precision,
        'Recall': recall,
        'F1': f1,
    }
    
    return metrics


In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SecProGNN(input_features=dataset.num_node_features,
                hidden_channels=dataset.num_node_features,
                num_classes=dataset.num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()


best_val_acc = 0.0
patience_counter = 0
patience = 20

def print_divider(title=None):
    if title:
        print(f"\n{'='*10} {title} {'='*10}")
    else:
        print("\n" + "="*30)

def print_metrics(metrics, dataset_name):
    print_divider(f"{dataset_name} Performance")
    for metric, value in metrics.items():
        print(f"{metric:15}: {value:.4f}")
    print_divider()

for epoch in range(1, 501):  
    train()

    print_divider(f"Epoch: {epoch}")

    train_metrics = evaluate(train_loader)
    print_metrics(train_metrics, "Training")

    val_metrics = evaluate(val_loader)
    print_metrics(val_metrics, "Validation")

    val_acc = val_metrics['Accuracy']
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
        torch.save(model.state_dict(), 'best_model.pth')
        print(f"🌟 Saved best model with Validation Accuracy: {val_acc:.4f}")
    else:
        patience_counter += 1
        print(f"Patience Counter: {patience_counter}/{patience}")

    if patience_counter >= patience:
        print_divider("Early Stopping Triggered")
        break