In [48]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer

# device config
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# hyper parameters
input_size = 20 # 20 features for each batch
hidden_size = 100 # can try different sizes
num_epochs = 2 # number of times that the full dataset is trained
batch_size = 100 # number of training data samples for each iteration
learning_rate = 0.001 # magnitude of the change in the weights during update

train = pd.read_csv('heart_data.csv')
df = pd.DataFrame(data=train)
df_train = df.drop(['HeartDisease'], axis=1)
# NOTES: X = df.drop(['HeartDisease'], axis=1) # drop label column from feature set
# NOTES: y = df['HeartDisease'] # store column label

null_rows = df_train.isnull().any(axis=1)
df_train.loc[null_rows].head() # no null rows so can skip step to address missing data

# one hot encoding (for each sample and its categorical attributes, sets its given category to 1 and others to 0)
df_cat = df_train.drop(['Age', 'RestingBP', 'Cholesterol', 'FastingBS', 'MaxHR','Oldpeak'], axis=1) # drop numerical
cat_encoder = OneHotEncoder()
encoded_cat = cat_encoder.fit_transform(df_cat)

# standardize numerical data (convert to scaled range within 0-1)
df_num = df_train.select_dtypes(include=[np.number])
scaler = StandardScaler()
scaled_num = scaler.fit_transform(df_num)

# create separate processing pipelines with functions for numerical and categorical data
num_pipeline = make_pipeline(StandardScaler())
cat_pipeline = make_pipeline(OneHotEncoder(handle_unknown="ignore")) # ignore unknown category values (if applicable)
num_cols = ['Age', 'RestingBP', 'Cholesterol', 'FastingBS', 'MaxHR','Oldpeak']
cat_cols = ['Sex', 'ChestPainType', 'RestingECG', 'ExerciseAngina', 'ST_Slope']

preprocessing = ColumnTransformer([("num", num_pipeline, num_cols), ("cat", cat_pipeline, cat_cols)])
processed_data = preprocessing.fit_transform(df_train)
processed_df = pd.DataFrame(processed_data, columns=preprocessing.get_feature_names_out(), index=df_train.index)

processed_df_full = processed_df.copy()
processed_df_full.insert(0, "HeartDisease", df["HeartDisease"].to_numpy(), False) # full data to use in dataset class (as csv)
processed_csv = processed_df_full.to_csv('processed_heart_data.csv', index=False)

processed_df.head()

Unnamed: 0,num__Age,num__RestingBP,num__Cholesterol,num__FastingBS,num__MaxHR,num__Oldpeak,cat__Sex_F,cat__Sex_M,cat__ChestPainType_ASY,cat__ChestPainType_ATA,cat__ChestPainType_NAP,cat__ChestPainType_TA,cat__RestingECG_LVH,cat__RestingECG_Normal,cat__RestingECG_ST,cat__ExerciseAngina_N,cat__ExerciseAngina_Y,cat__ST_Slope_Down,cat__ST_Slope_Flat,cat__ST_Slope_Up
0,-1.43314,0.410909,0.82507,-0.551341,1.382928,-0.832432,0.0,1.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,1.0
1,-0.478484,1.491752,-0.171961,-0.551341,0.754157,0.105664,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0
2,-1.751359,-0.129513,0.770188,-0.551341,-1.525138,-0.832432,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,1.0
3,-0.584556,0.302825,0.13904,-0.551341,-1.132156,0.574711,1.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0
4,0.051881,0.951331,-0.034755,-0.551341,-0.581981,-0.832432,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,1.0


In [49]:
class HeartDataset(Dataset):

    def __init__(self):
        # Initialize data, download, etc.
        # read with numpy or pandas
        xy = np.loadtxt('processed_heart_data.csv', delimiter=',', dtype=np.float32, skiprows=1)
        self.n_samples = xy.shape[0]

        # here the first column is the class label, the rest are the features
        self.x_data = torch.from_numpy(xy[:, 1:]) # size [n_samples, n_features]
        self.y_data = torch.from_numpy(xy[:, [0]]) # size [n_samples, 1]

    # support indexing such that dataset[i] can be used to get i-th sample
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    # we can call len(dataset) to return the size
    def __len__(self):
        return self.n_samples

In [50]:
class NeuralNet(nn.Module): # custom neural net derived from pytorch
    def __init__(self, input_size, hidden_size):
        super(NeuralNet, self).__init__()
        self.l1 = nn.Linear(input_size, hidden_size) # layer 1 uses linear function on neurons using input
        self.relu = nn.ReLU() # activation function, decides how the neurons in next layer activate based on factoring in weights and activations of previous neurons (converts activation values to range 0 to +)
        self.l2 = nn.Linear(hidden_size, 1) # layer 2 uses linear function on neurons to return output (1 class since outcome of heart disease)
    
    def forward(self, x): # trains by going forward through neural net to calculate initial output values
        out = self.l1(x) # apply layer 1 on input neurons
        out = self.relu(out) 
        out = self.l2(out) # apply layer 2 on layer 1 output neurons to get class values
        return out 

In [51]:
dataset = HeartDataset()
first_data = dataset[0] # get first sample and get separate features and labels
features, labels = first_data
print(features, labels)

train_loader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=False)

examples = iter(train_loader) # to iterate through samples
samples, labels = next(examples)
print(samples.shape, labels.shape) 
print(len(train_loader))

tensor([-1.4331,  0.4109,  0.8251, -0.5513,  1.3829, -0.8324,  0.0000,  1.0000,
         0.0000,  1.0000,  0.0000,  0.0000,  0.0000,  1.0000,  0.0000,  1.0000,
         0.0000,  0.0000,  0.0000,  1.0000]) tensor([0.])
torch.Size([100, 20]) torch.Size([100, 1])
10


In [52]:
model = NeuralNet(input_size, hidden_size)
# factors in the actual class Y  and log(predicted class probability Y-hat) similar to cross entropy loss
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) # optimizer to update weights using gradients calculated from loss

# train samples
n_total_steps = len(train_loader)
for epoch in range(num_epochs):
    for i, (features, labels) in enumerate(train_loader):
        features = features.reshape(-1, input_size).to(device) # reshape images array from 100,20 to 20 due to input size 918
        labels = labels.to(device)

        # forward pass through neural network to get initial output class values
        outputs = model(features)
        loss = criterion(outputs, labels)

        # backpropagation to calculate gradients and then update weights 
        # find gradient d Loss / dw from output layer to input layer, via chain rule applied to equation applying weights to activations and to activation function applied on that result
        optimizer.zero_grad() # avoids adding gradients from previous sample to current
        loss.backward()
        optimizer.step()

        if (i+1) % 1 == 0: # print for every 100 samples
            print(f'epoch {epoch + 1} / {num_epochs}, step {i+1} / {n_total_steps}, loss = {loss.item():.4f}')

epoch 1 / 2, step 1 / 10, loss = 0.6865
epoch 1 / 2, step 2 / 10, loss = 0.6620
epoch 1 / 2, step 3 / 10, loss = 0.6758
epoch 1 / 2, step 4 / 10, loss = 0.6670
epoch 1 / 2, step 5 / 10, loss = 0.6547
epoch 1 / 2, step 6 / 10, loss = 0.6426
epoch 1 / 2, step 7 / 10, loss = 0.6375
epoch 1 / 2, step 8 / 10, loss = 0.6236
epoch 1 / 2, step 9 / 10, loss = 0.6341
epoch 1 / 2, step 10 / 10, loss = 0.6077
epoch 2 / 2, step 1 / 10, loss = 0.6227
epoch 2 / 2, step 2 / 10, loss = 0.6020
epoch 2 / 2, step 3 / 10, loss = 0.6112
epoch 2 / 2, step 4 / 10, loss = 0.6039
epoch 2 / 2, step 5 / 10, loss = 0.5759
epoch 2 / 2, step 6 / 10, loss = 0.5875
epoch 2 / 2, step 7 / 10, loss = 0.5879
epoch 2 / 2, step 8 / 10, loss = 0.5665
epoch 2 / 2, step 9 / 10, loss = 0.5627
epoch 2 / 2, step 10 / 10, loss = 0.5540


In [53]:
# test
with torch.no_grad():
    n_correct = 0
    n_samples = 0
    for entries, labels in test_dataloader:
        entries = entries.reshape(-1, input_size).to(device)
        labels = labels.to(device)
        outputs = model(entries)

        # Apply sigmoid activation to convert logits to probabilities
        probabilities = torch.sigmoid(outputs)

        # Convert probabilities to binary predictions (0 or 1)
        predictions = (probabilities > 0.5).float()

        n_samples += labels.shape[0]
        n_correct += (predictions == labels).sum().item()

    acc = n_correct / n_samples
    print(f'accuracy = {acc}')

accuracy = 0.8376906318082789
