In [31]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score
from imblearn.over_sampling import SMOTE
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

# ML
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import GaussianNB, MultinomialNB, BernoulliNB
import lightgbm as lgb
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier

# DL
import wandb
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [2]:
# 加载数据
df_train = pd.read_csv("./dataset/train.csv")
df_valid = pd.read_csv("./dataset/val.csv")
df_test = pd.read_csv("./dataset/test.csv")

# 平衡正负例
smote = SMOTE(sampling_strategy='auto', random_state=42)
X_train = df_train.drop(["HadHeartAttack"], axis=1).values
y_train = df_train[["HadHeartAttack"]].values.ravel()
X_train, y_train = smote.fit_resample(X_train, y_train)

X_valid = df_valid.drop(["HadHeartAttack"], axis=1).values
y_valid = df_valid[["HadHeartAttack"]].values.ravel()
X_valid, y_valid = smote.fit_resample(X_valid, y_valid)

X_test = df_test.drop(["HadHeartAttack"], axis=1).values
y_test = df_test[["HadHeartAttack"]].values.ravel()
X_test, y_test = smote.fit_resample(X_test, y_test)

## 线性模型

### 逻辑回归

In [16]:
# 训练
logreg = LogisticRegression(random_state=42)
logreg.fit(X_train, y_train)

# 测试
y_pred = logreg.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("Logistic Regression 测试准确率：%.4f" % acc)
print("Logistic Regression 测试 F1 值：%.4f" % f1)

Logistic Regression 测试准确率：0.7987
Logistic Regression 测试 F1 值：0.7926


## 决策树模型

In [17]:
# 训练
decision_tree = DecisionTreeClassifier(criterion='gini', random_state=42)
decision_tree.fit(X_train, y_train)

# 测试
y_pred = decision_tree.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("CART 测试准确率：%.4f" % acc)
print("CART 测试 F1 值：%.4f" % f1)

CART 测试准确率：0.9290
CART 测试 F1 值：0.9278


## 贝叶斯模型

### 高斯朴素贝叶斯

In [18]:
# 训练
nb = GaussianNB()
nb.fit(X_train, y_train)

# 测试
y_pred = nb.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("高斯朴素贝叶斯 测试准确率：%.4f" % acc)
print("高斯朴素贝叶斯 测试 F1 值：%.4f" % f1)

高斯朴素贝叶斯 测试准确率：0.7681
高斯朴素贝叶斯 测试 F1 值：0.7631


### 多项式朴素贝叶斯

In [19]:
# 训练
nb = MultinomialNB()
nb.fit(X_train, y_train)

# 测试
y_pred = nb.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("多项式朴素贝叶斯 测试准确率：%.4f" % acc)
print("多项式朴素贝叶斯 测试 F1 值：%.4f" % f1)

多项式朴素贝叶斯 测试准确率：0.7801
多项式朴素贝叶斯 测试 F1 值：0.7754


### 伯努利朴素贝叶斯

In [20]:
# 训练
nb = BernoulliNB()
nb.fit(X_train, y_train)

# 测试
y_pred = nb.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("伯努利朴素贝叶斯 测试准确率：%.4f" % acc)
print("伯努利朴素贝叶斯 测试 F1 值：%.4f" % f1)

伯努利朴素贝叶斯 测试准确率：0.8051
伯努利朴素贝叶斯 测试 F1 值：0.8029


## 集成学习

### LightGBM

<https://lightgbm.readthedocs.io/en/latest/index.html>

In [21]:
# 训练
gbm = lgb.LGBMClassifier(random_state=42)
gbm.fit(X_train, y_train, eval_set=[(X_valid, y_valid)], eval_metric="l1", callbacks=[lgb.early_stopping(5)])

# 测试
X_test_df = pd.DataFrame(X_test, columns=df_train.drop(["HadHeartAttack"], axis=1).columns)
y_pred = gbm.predict(X_test_df, num_iteration=gbm.best_iteration_)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("LightGBM 测试准确率：%.4f" % acc)
print("LightGBM 测试 F1 值：%.4f" % f1)

[LightGBM] [Info] Number of positive: 139551, number of negative: 139551
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.020115 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 9690
[LightGBM] [Info] Number of data points in the train set: 279102, number of used features: 38
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.500000 -> initscore=0.000000
Training until validation scores don't improve for 5 rounds
Did not meet early stopping. Best iteration is:
[100]	valid_0's l1: 0.0463323	valid_0's binary_logloss: 0.0793068
LightGBM 测试准确率：0.9715
LightGBM 测试 F1 值：0.9711


### Adaboost

In [33]:
base_classifier = DecisionTreeClassifier(max_depth=5)
ada_boost = AdaBoostClassifier(base_classifier, n_estimators=50, random_state=42)

# 训练
ada_boost.fit(X_train, y_train)

# 测试
y_pred = ada_boost.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("AdaBoost 测试准确率：%.4f" % acc)
print("AdaBoost 测试 F1 值：%.4f" % f1)

AdaBoost 测试准确率：0.9679
AdaBoost 测试 F1 值：0.9676


### 随机森林

In [22]:
# 训练
rf = RandomForestClassifier(random_state=42)
rf.fit(X_train, y_train)

# 测试
y_pred = rf.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("RandomForest 测试准确率：%.4f" % acc)
print("RandomForest 测试 F1 值：%.4f" % f1)

RandomForest 测试准确率：0.9579
RandomForest 测试 F1 值：0.9570


## 懒惰学习

In [23]:
# 训练
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(X_train, y_train)

# 测试
y_pred = knn.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("K近邻 测试准确率：%.4f" % acc)
print("K近邻 测试 F1 值：%.4f" % f1)

K近邻 测试准确率：0.7313
K近邻 测试 F1 值：0.6909


## 深度模型

### FNN

In [28]:
"""模型搭建"""
class FNN(nn.Module):
    def __init__(self, feat_num):
        super(FNN, self).__init__()
        self.fc1 = nn.Linear(feat_num, 32)
        self.fc2 = nn.Linear(32, 16)
        self.fc3 = nn.Linear(16, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return self.sigmoid(x)

"""数据重构"""
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.bool)
X_valid_tensor = torch.tensor(X_valid, dtype=torch.float32)
y_valid_tensor = torch.tensor(y_valid, dtype=torch.bool)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.bool)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
valid_dataset = TensorDataset(X_valid_tensor, y_valid_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)

In [26]:
"""开始训练"""
wandb.init(project='heart-disease-prediction', name='run')
model = FNN(feat_num=X_train.shape[1])
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCELoss()
num_epochs = 3
best_valid_accuracy = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    # 训练阶段
    for step, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()  # 清除梯度
        outputs = model(inputs).squeeze()  # 计算预测值
        loss = criterion(outputs, labels.float())  # 计算损失
        loss.backward()  # 反向传播
        optimizer.step()  # 更新参数

        running_loss += loss.item()

        # 计算准确率
        predicted = (outputs > 0.5).float()  # 二分类阈值设置为0.5
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

        # 记录每个 step 的训练损失和准确率
        train_accuracy = correct / total
        wandb.log({
            'train_loss': loss.item(),
            'train_accuracy': train_accuracy,
            'epoch': epoch + 1,
            'step': step + 1,
        })

    # 计算训练集的损失和准确率
    train_loss = running_loss / len(train_loader)
    train_accuracy = correct / total

    # 验证阶段
    model.eval()
    valid_correct = 0
    valid_total = 0
    valid_loss = 0

    with torch.no_grad():  # 评估时不需要计算梯度
        for inputs, labels in valid_loader:
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, labels.float())
            valid_loss += loss.item()

            predicted = (outputs > 0.5).float()
            valid_correct += (predicted == labels).sum().item()
            valid_total += labels.size(0)

    # 计算验证集的损失和准确率
    valid_loss = valid_loss / len(valid_loader)
    valid_accuracy = valid_correct / valid_total

    # 记录每个 epoch 的训练和验证数据
    wandb.log({
        'train_loss': train_loss,
        'train_accuracy': train_accuracy,
        'valid_loss': valid_loss,
        'valid_accuracy': valid_accuracy,
        'epoch': epoch + 1,
    })

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
    print(f"Valid Loss: {valid_loss:.4f}, Valid Accuracy: {valid_accuracy:.4f}")

    # 保存最好的模型
    if valid_accuracy > best_valid_accuracy:
        best_valid_accuracy = valid_accuracy
        torch.save(model.state_dict(), "./models/best-fnn-model.pth")

wandb.finish()

Epoch 1/3
Train Loss: 0.4064, Train Accuracy: 0.8124
Valid Loss: 0.3875, Valid Accuracy: 0.8233
Epoch 2/3
Train Loss: 0.3769, Train Accuracy: 0.8312
Valid Loss: 0.3785, Valid Accuracy: 0.8309
Epoch 3/3
Train Loss: 0.3643, Train Accuracy: 0.8394
Valid Loss: 0.3707, Valid Accuracy: 0.8371


0,1
epoch,▁▁▁▁▁▁▁▁▁▁▁▁▁▅▅▅▅▅▅▅▅▅▅▅████████████████
step,▁▂▂▃▃▄▆▆▆▆▇██▁▂▂▂▃▃▃▃▃▃▄▄▅▆▆▆▆▇▁▂▂▃▄▅▆▆█
train_accuracy,▁▃▃▄▄▅▅▅▆▆▆▆▇▇▇▇▇▇▇▇▇▇▇▇▇███████████████
train_loss,█▄▄▃▄▅▄▅▄▃▅▃▅▂▄▆▃▃▂▃▃▂▃▃▃▄▁▄▃▂▃▁▂▅▂▁▄▃▂▂
valid_accuracy,▁▅█
valid_loss,█▄▁

0,1
epoch,3.0
step,4361.0
train_accuracy,0.83941
train_loss,0.36425
valid_accuracy,0.83714
valid_loss,0.37066


In [30]:
"""模型测试"""
model = FNN(feat_num=X_train.shape[1])
model.load_state_dict(torch.load("./models/best-fnn-model.pth"))
model.eval()

test_correct = 0
test_total = 0

TP, FP, TN, FN = 0, 0, 0, 0

with torch.no_grad():
    outputs = model(X_test_tensor).squeeze()
    predicted = (outputs > 0.5).float()

    TP += ((predicted == 1) & (y_test_tensor == 1)).sum().item()
    FP += ((predicted == 1) & (y_test_tensor == 0)).sum().item()
    TN += ((predicted == 0) & (y_test_tensor == 0)).sum().item()
    FN += ((predicted == 0) & (y_test_tensor == 1)).sum().item()

    test_correct += (predicted == y_test_tensor).sum().item()
    test_total += y_test_tensor.size(0)

# 计算 F1-score
precision = TP / (TP + FP) if (TP + FP) > 0 else 0
recall = TP / (TP + FN) if (TP + FN) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

test_accuracy = test_correct / test_total

print("FNN 测试准确率：%.4f" % test_accuracy)
print("FNN 测试 F1-score：%.4f" % f1)

FNN 测试准确率：0.8242
FNN 测试 F1-score：0.8248
