Import Dependencies

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd
import matplotlib.pyplot as plt

Prep data & convert to PyTorch

In [63]:
df_train = pd.read_csv(r"../data/processed/train.csv")
df_test = pd.read_csv(r"../data/processed/test.csv")

# Separate predictors and target variable; drop some columns that will not be use.
X = df_train.drop(columns=["PATIENT", "BIRTHDATE", "BIRTHDATE_ORD", "DEATHDATE", "cvd_flag"])
Y = df_train["cvd_flag"].astype(float)

X_test = df_test.drop(columns=["PATIENT", "BIRTHDATE", "BIRTHDATE_ORD", "DEATHDATE", "cvd_flag"])
Y_test = df_test["cvd_flag"].astype(float)

# Convert any object columns to numeric
object_cols = X.select_dtypes(include=['object']).columns
for col in object_cols:
    X[col] = pd.to_numeric(X[col], errors='coerce')

object_cols = X_test.select_dtypes(include=['object']).columns
for col in object_cols:
    X_test[col] = pd.to_numeric(X_test[col], errors='coerce')

X.fillna(0, inplace=True)
X_test.fillna(0, inplace=True)

# Convert the feature DataFrames to PyTorch tensors.
X_train_tensor = torch.tensor(X.values, dtype=torch.float32)
Y_train_tensor = torch.tensor(Y.values, dtype=torch.float32).unsqueeze(1)
X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32)
Y_test_tensor = torch.tensor(Y_test.values, dtype=torch.float32).unsqueeze(1)

# Create TensorDataset objects for training and testing data
train_dataset = TensorDataset(X_train_tensor, Y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, Y_test_tensor)

# confirm the frequencies from each dataset
print(Y_test.value_counts())
print(Y.value_counts())

cvd_flag
0.0    1895
1.0     456
Name: count, dtype: int64
cvd_flag
0.0    7483
1.0    1921
Name: count, dtype: int64


Feed-forward neural network

In [72]:
#############################################
# Using simple feed-forward neural network
#############################################
batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

class HeartFailureModel(nn.Module):
    def __init__(self, input_dim):
        super(HeartFailureModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(128, 64)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(64, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu1(self.fc1(x))
        x = self.relu2(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        return x

input_dim = X_train_tensor.shape[1]
model = HeartFailureModel(input_dim)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch_features, batch_labels in train_loader:
        # Forward pass
        outputs = model(batch_features)

        loss = criterion(outputs, batch_labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * batch_features.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

# Evaluate the test set
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for batch_features, batch_labels in test_loader:
        outputs = model(batch_features)
        # Convert probabilities to binary predictions. Heart rates are pretty serious so setting it to 0.35 instead of the 0.5
        predicted = (outputs > 0.35).float()
        total += batch_labels.size(0)
        correct += (predicted == batch_labels).sum().item()
accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")


Epoch 1/50, Loss: 21.0142
Epoch 2/50, Loss: 20.4211
Epoch 3/50, Loss: 20.4211
Epoch 4/50, Loss: 20.4212
Epoch 5/50, Loss: 20.4212
Epoch 6/50, Loss: 20.4213
Epoch 7/50, Loss: 20.4213
Epoch 8/50, Loss: 20.4214
Epoch 9/50, Loss: 20.4214
Epoch 10/50, Loss: 20.4215
Epoch 11/50, Loss: 20.4215
Epoch 12/50, Loss: 20.4215
Epoch 13/50, Loss: 20.4216
Epoch 14/50, Loss: 20.4217
Epoch 15/50, Loss: 20.4217
Epoch 16/50, Loss: 20.4217
Epoch 17/50, Loss: 20.4218
Epoch 18/50, Loss: 20.4218
Epoch 19/50, Loss: 20.4219
Epoch 20/50, Loss: 20.4219
Epoch 21/50, Loss: 20.4220
Epoch 22/50, Loss: 20.4220
Epoch 23/50, Loss: 20.4220
Epoch 24/50, Loss: 20.4221
Epoch 25/50, Loss: 20.4221
Epoch 26/50, Loss: 20.4222
Epoch 27/50, Loss: 20.4222
Epoch 28/50, Loss: 20.4223
Epoch 29/50, Loss: 20.4223
Epoch 30/50, Loss: 20.4223
Epoch 31/50, Loss: 20.4224
Epoch 32/50, Loss: 20.4224
Epoch 33/50, Loss: 20.4225
Epoch 34/50, Loss: 20.4225
Epoch 35/50, Loss: 20.4225
Epoch 36/50, Loss: 20.4225
Epoch 37/50, Loss: 20.4226
Epoch 38/5

In [73]:
batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# A deeper feed-forward network with more hidden layers.
class HeartFailureModelDeep(nn.Module):
    def __init__(self, input_dim):
        super(HeartFailureModelDeep, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(128, 64)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(64, 32)
        self.relu3 = nn.ReLU()
        self.fc4 = nn.Linear(32, 16)
        self.relu4 = nn.ReLU()
        self.fc5 = nn.Linear(16, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu1(self.fc1(x))
        x = self.relu2(self.fc2(x))
        x = self.relu3(self.fc3(x))
        x = self.relu4(self.fc4(x))
        x = self.sigmoid(self.fc5(x))
        return x

input_dim = X_train_tensor.shape[1]  # Should be 459
model = HeartFailureModelDeep(input_dim)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch_features, batch_labels in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_features)
        loss = criterion(outputs, batch_labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * batch_features.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

model.eval()
correct = 0
total = 0
with torch.no_grad():
    for batch_features, batch_labels in test_loader:
        outputs = model(batch_features)
        predicted = (outputs > 0.5).float()  # Using 0.5 threshold
        total += batch_labels.size(0)
        correct += (predicted == batch_labels).sum().item()

accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")

Epoch 1/20, Loss: 4.7505
Epoch 2/20, Loss: 1.3001
Epoch 3/20, Loss: 0.5811
Epoch 4/20, Loss: 0.6549
Epoch 5/20, Loss: 0.6124
Epoch 6/20, Loss: 1.1841
Epoch 7/20, Loss: 0.6672
Epoch 8/20, Loss: 0.5928
Epoch 9/20, Loss: 0.5510
Epoch 10/20, Loss: 0.5386
Epoch 11/20, Loss: 0.6196
Epoch 12/20, Loss: 0.5243
Epoch 13/20, Loss: 0.4896
Epoch 14/20, Loss: 0.4819
Epoch 15/20, Loss: 0.5369
Epoch 16/20, Loss: 0.4798
Epoch 17/20, Loss: 0.4935
Epoch 18/20, Loss: 0.4527
Epoch 19/20, Loss: 0.4339
Epoch 20/20, Loss: 0.4401
Test Accuracy: 84.18%
