In [6]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, roc_auc_score, confusion_matrix, matthews_corrcoef
import random
import torch.nn.functional as F

# Function to set seed for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Set the seed for reproducibility
set_seed(630)

# Load the CSV files into pandas DataFrame
train_320 = pd.read_csv('Cas300_esm.csv')
test_320 = pd.read_csv('Cas118_esm.csv')

train_aatp = pd.read_csv('train_aatp.csv')
test_aatp = pd.read_csv('test_aatp.csv')

# Convert pandas DataFrame to torch tensors
train_320 = torch.tensor(train_320.values).float()
test_320 = torch.tensor(test_320.values).float()
train_aatp = torch.tensor(train_aatp.values).float()
test_aatp = torch.tensor(test_aatp.values).float()

# Load the label files
y_train = np.load('y_train.npy')
y_test = np.load('y_test.npy')

# Prepare data for DataLoader
train_dataset = TensorDataset(train_320.unsqueeze(1), train_aatp, torch.tensor(y_train).long())
test_dataset = TensorDataset(test_320.unsqueeze(1), test_aatp, torch.tensor(y_test).long())

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

class CombinedModel(nn.Module):
    def __init__(self, conv_window_sizes=[9, 11, 13], aatp_input_size=420):
        super(CombinedModel, self).__init__()
        
        # Convolutional part for 320-dimensional features
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=1, out_channels=4, kernel_size=w, padding=w//2) for w in conv_window_sizes
        ])
        self.pool = nn.MaxPool1d(kernel_size=2)
        conv_output_size = 4 * len(conv_window_sizes) * (320 // 2)
        
        # Fully connected layer to reduce the dimension of CNN output
        self.fc_reduce = nn.Linear(conv_output_size, 256)  # Reduce to 256
        
        # Fully connected layers for 400-dimensional AATP features
        self.fc_aatp1 = nn.Linear(aatp_input_size, 128)
        self.fc_aatp2 = nn.Linear(128, 64)
        
        # Final fully connected layers after concatenation
        self.fc1 = nn.Linear(256 + 64, 128)
        self.fc2 = nn.Linear(128, 2)
        
        self.relu = nn.ReLU()
    
    def forward(self, x_320, x_aatp):
        # Convolutional processing of 320-dimensional input
        conv_outputs = []
        for conv in self.convs:
            conv_out = self.pool(self.relu(conv(x_320)))
            conv_outputs.append(conv_out.view(conv_out.size(0), -1))
        x_conv = torch.cat(conv_outputs, dim=1)
        
        # Reduce the dimension of CNN output
        x_conv = self.relu(self.fc_reduce(x_conv))  # Shape: (batch_size, 256)
        
        # Fully connected processing of 400-dimensional AATP input
        x_aatp = self.relu(self.fc_aatp1(x_aatp))
        x_aatp = self.relu(self.fc_aatp2(x_aatp))  # Shape: (batch_size, 64)
        
        # Concatenation of reduced CNN output and fully connected outputs
        x = torch.cat((x_conv, x_aatp), dim=1)  # Shape: (batch_size, 256 + 64)
        
        # Final classification layers
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x

# Early Stopping class
class EarlyStopping:
    def __init__(self, patience=5, delta=0.001):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.early_stop = False
        self.counter = 0
        self.best_model = None

    def __call__(self, val_loss, model):
        if self.best_loss is None or val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
            self.best_model = model.state_dict()
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

# Training function with early stopping and saving the best model
def train_model_with_early_stopping(model, train_loader, test_loader, criterion, optimizer, epochs=20, patience=5, model_save_path="best_model.pth"):
    early_stopping = EarlyStopping(patience=patience, delta=0.001)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for x_320, x_aatp, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(x_320, x_aatp)
            loss = criterion(outputs, labels.view(-1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        # Validate the model on the test set
        val_loss = 0.0
        model.eval()
        with torch.no_grad():
            for x_320, x_aatp, labels in test_loader:
                outputs = model(x_320, x_aatp)
                loss = criterion(outputs, labels.view(-1))
                val_loss += loss.item()
        val_loss /= len(test_loader)
        
        print(f'Epoch {epoch+1}/{epochs}, Training Loss: {running_loss/len(train_loader):.4f}, Validation Loss: {val_loss:.4f}')
        
        # Check if early stopping is required
        early_stopping(val_loss, model)
        
        if early_stopping.early_stop:
            print("Early stopping triggered")
            model.load_state_dict(early_stopping.best_model)  # Load the best model
            break

    # Save the best model to the specified path
    torch.save(model.state_dict(), model_save_path)
    print(f"模型已保存到 {model_save_path}")

# Instantiate the combined model
model = CombinedModel()

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model with early stopping and save the best model
train_model_with_early_stopping(
    model, 
    train_loader, 
    test_loader, 
    criterion, 
    optimizer, 
    epochs=30, 
    patience=5, 
    model_save_path="best_combined_model.pth"
)

# Evaluation function with corrected AUC calculation
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    all_labels = []
    all_preds = []
    all_probs = []  # 用于保存模型输出的概率
    
    with torch.no_grad():
        for x_320, x_aatp, labels in test_loader:
            outputs = model(x_320, x_aatp)
            probs = F.softmax(outputs, dim=1)[:, 1]  # 获取正类的概率
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels.view(-1)).sum().item()
            
            all_labels.extend(labels.view(-1).cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())  # 保存概率
    
    # Calculate metrics
    acc = accuracy_score(all_labels, all_preds)
    auc = roc_auc_score(all_labels, all_probs)  # 使用概率计算AUC
    
    cm = confusion_matrix(all_labels, all_preds)
    sp = cm[0, 0] / (cm[0, 0] + cm[0, 1])
    sn = cm[1, 1] / (cm[1, 0] + cm[1, 1])
    mcc = matthews_corrcoef(all_labels, all_preds)
    
    return acc, auc, sp, sn, mcc

# Load the best model weights
model.load_state_dict(torch.load("best_combined_model.pth"))
model.eval()

# Evaluate the model on the test set
acc, auc, sp, sn, mcc = evaluate_model(model, test_loader)

# Print the evaluation metrics
print(f'Accuracy: {acc:.4f}')
print(f'AUC: {auc:.4f}')
print(f'Specificity: {sp:.4f}')
print(f'Sensitivity: {sn:.4f}')
print(f'MCC: {mcc:.4f}')

Epoch 1/30, Training Loss: 0.6841, Validation Loss: 0.6646
Epoch 2/30, Training Loss: 0.6296, Validation Loss: 0.5707
Epoch 3/30, Training Loss: 0.5016, Validation Loss: 0.3976
Epoch 4/30, Training Loss: 0.2961, Validation Loss: 0.3564
Epoch 5/30, Training Loss: 0.2665, Validation Loss: 0.2298
Epoch 6/30, Training Loss: 0.1928, Validation Loss: 0.2154
Epoch 7/30, Training Loss: 0.1622, Validation Loss: 0.1847
Epoch 8/30, Training Loss: 0.1234, Validation Loss: 0.1904
Epoch 9/30, Training Loss: 0.1072, Validation Loss: 0.2267
Epoch 10/30, Training Loss: 0.0889, Validation Loss: 0.2103
Epoch 11/30, Training Loss: 0.0772, Validation Loss: 0.1674
Epoch 12/30, Training Loss: 0.0898, Validation Loss: 0.1648
Epoch 13/30, Training Loss: 0.1043, Validation Loss: 0.3593
Epoch 14/30, Training Loss: 0.1142, Validation Loss: 0.3455
Epoch 15/30, Training Loss: 0.0997, Validation Loss: 0.3371
Epoch 16/30, Training Loss: 0.1111, Validation Loss: 0.2246
Epoch 17/30, Training Loss: 0.0741, Validation Lo

  model.load_state_dict(torch.load("best_combined_model.pth"))
