In [1]:
import numpy as np
import optuna
from sklearn import metrics
import warnings
import pickle
warnings.simplefilter(action='ignore', category=FutureWarning)
import joblib
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import accuracy_score, recall_score, roc_auc_score, r2_score, roc_curve
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import StepLR

  from .autonotebook import tqdm as notebook_tqdm


In [12]:
# Load the data
# data = np.load('Classified_Data_Var.npz')
data = np.load('Classified_Data.npz')

# Convert the data to PyTorch tensors
X = torch.Tensor(data['features'])
y = torch.Tensor(data['labels']).long()

print(X.shape)  
print(y.shape)  

# Split the data into training+validation set and test set
X_train_val, X_test, y_train_val, y_test = train_test_split(X.numpy(), y.numpy(), test_size=0.2, random_state=42)

# Further split the training+validation set into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=42)  # 0.25 of the train_val set, i.e., 20% of the original data

# If you need to flatten the 9x9 matrices into 81-element vectors, do this:
X_train = X_train.reshape(X_train.shape[0], -1)
X_val = X_val.reshape(X_val.shape[0], -1)
X_test = X_test.reshape(X_test.shape[0], -1)

# Standardize the training and validation data
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

# Convert the numpy arrays back to tensors
X_train = torch.Tensor(X_train)
X_val = torch.Tensor(X_val)
X_test = torch.Tensor(X_test)
y_train = torch.Tensor(y_train).long()
y_val = torch.Tensor(y_val).long()
y_test = torch.Tensor(y_test).long()

torch.Size([41740, 9, 9])
torch.Size([41740])


In [13]:
# 创建数据加载器
batch_size = 64  

train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [14]:
class DeepNN(nn.Module):
    def __init__(self, input_size):
        super(DeepNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)  
        self.relu1 = nn.ReLU()                  
        self.dropout1 = nn.Dropout(0.5)         
        self.fc2 = nn.Linear(128, 256)          
        self.relu2 = nn.ReLU()                  

        self.fc9 = nn.Linear(256, 256)          
        self.relu9 = nn.ReLU() 


        self.dropout2 = nn.Dropout(0.5)         
        self.fc3 = nn.Linear(256, 128)          
        self.relu3 = nn.ReLU()                  
        self.fc4 = nn.Linear(128, 64)           
        self.relu4 = nn.ReLU()                  
        self.fc5 = nn.Linear(64, 1)             

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.dropout2(x)

        x = self.fc9(x)
        x = self.relu9(x)
        x = self.fc9(x)
        x = self.relu9(x)
        x = self.fc9(x)
        x = self.relu9(x)

        x = self.fc3(x)
        x = self.relu3(x)
        x = self.fc4(x)
        x = self.relu4(x)
        x = self.fc5(x)
        return x

In [15]:
# 初始化网络
input_size = X_train.size(1)  # 特征数量
model = DeepNN(input_size)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
# 定义损失函数和优化器
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = StepLR(optimizer, step_size=100, gamma=0.5)
# 设置训练的epochs数
epochs = 20

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # 清除梯度
        optimizer.zero_grad()
        
        # 前向传播
        outputs = model(inputs)
        
        # 计算损失
        loss = criterion(outputs.squeeze(), labels.float())
        
        # 反向传播
        loss.backward()
        
        # 参数更新
        optimizer.step()
        scheduler.step()
        
        running_loss += loss.item()
        
        # 计算准确率
        predictions = torch.round(torch.sigmoid(outputs.squeeze()))
        correct_predictions += (predictions == labels.float()).sum().item()
        total_predictions += labels.size(0)
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct_predictions / total_predictions
    
    print(f'Epoch {epoch+1}/{epochs} - Loss: {epoch_loss:.4f} - Accuracy: {epoch_acc:.4f}')
    
    # 在验证集上评估模型
    model.eval()
    val_loss = 0.0
    correct_val_predictions = 0
    total_val_predictions = 0
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels.float())
            val_loss += loss.item()
            
            # 计算准确率
            val_predictions = torch.round(torch.sigmoid(outputs.squeeze()))
            correct_val_predictions += (val_predictions == labels.float()).sum().item()
            total_val_predictions += labels.size(0)
    
    val_epoch_loss = val_loss / len(val_loader)
    val_epoch_acc = correct_val_predictions / total_val_predictions
    
    print(f'Validation Loss: {val_epoch_loss:.4f} - Validation Accuracy: {val_epoch_acc:.4f}')

Epoch 1/20 - Loss: 0.6336 - Accuracy: 0.6538
Validation Loss: 0.6251 - Validation Accuracy: 0.6616
Epoch 2/20 - Loss: 0.6219 - Accuracy: 0.6661
Validation Loss: 0.6223 - Validation Accuracy: 0.6600
Epoch 3/20 - Loss: 0.6214 - Accuracy: 0.6693
Validation Loss: 0.6223 - Validation Accuracy: 0.6597
Epoch 4/20 - Loss: 0.6197 - Accuracy: 0.6683
Validation Loss: 0.6223 - Validation Accuracy: 0.6597
Epoch 5/20 - Loss: 0.6198 - Accuracy: 0.6674
Validation Loss: 0.6223 - Validation Accuracy: 0.6597
Epoch 6/20 - Loss: 0.6194 - Accuracy: 0.6657
Validation Loss: 0.6223 - Validation Accuracy: 0.6597
Epoch 7/20 - Loss: 0.6195 - Accuracy: 0.6693
Validation Loss: 0.6223 - Validation Accuracy: 0.6597
Epoch 8/20 - Loss: 0.6210 - Accuracy: 0.6666
Validation Loss: 0.6223 - Validation Accuracy: 0.6597
Epoch 9/20 - Loss: 0.6210 - Accuracy: 0.6699
Validation Loss: 0.6223 - Validation Accuracy: 0.6597
Epoch 10/20 - Loss: 0.6214 - Accuracy: 0.6673
Validation Loss: 0.6223 - Validation Accuracy: 0.6597
Epoch 11/

In [16]:
# 在测试集上评估模型
model.eval()  
test_loss = 0.0
correct_test_predictions = 0
total_test_predictions = 0

with torch.no_grad():  
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels.float())
        test_loss += loss.item()
        
        # 计算准确率
        test_predictions = torch.round(torch.sigmoid(outputs.squeeze()))
        correct_test_predictions += (test_predictions == labels.float()).sum().item()
        total_test_predictions += labels.size(0)

test_epoch_loss = test_loss / len(test_loader)
test_epoch_acc = correct_test_predictions / total_test_predictions

print(f'Test Loss: {test_epoch_loss:.4f} - Test Accuracy: {test_epoch_acc:.4f}')

Test Loss: 0.6141 - Test Accuracy: 0.6673


In [9]:
from sklearn.metrics import recall_score, precision_score, f1_score, r2_score, roc_curve, auc
import matplotlib.pyplot as plt

# 初始化列表以存储真实标签和预测概率
true_labels = []
pred_probs = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        
        # 计算概率
        probs = torch.sigmoid(outputs.squeeze())
        
        # 存储真实标签和预测概率
        true_labels.extend(labels.cpu().numpy())
        pred_probs.extend(probs.cpu().numpy())

# 计算预测标签（根据预测概率和默认阈值0.5）
pred_labels = [1 if prob >= 0.5 else 0 for prob in pred_probs]
print(pred_labels)


[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

In [None]:
# 计算召回率
recall = recall_score(true_labels, pred_labels)

# 计算精确度
precision = precision_score(true_labels, pred_labels)

# 计算F1分数
f1 = f1_score(true_labels, pred_labels)

# 计算R^2得分
r2 = r2_score(true_labels, pred_probs)

# 计算ROC曲线
fpr, tpr, thresholds = roc_curve(true_labels, pred_probs)
roc_auc = auc(fpr, tpr)

# 打印指标
print(f'Recall: {recall:.4f}')
print(f'Precision: {precision:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'R^2 Score: {r2:.4f}')
print(f'ROC AUC: {roc_auc:.4f}')

# 绘制ROC曲线
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc='lower right')
plt.show()