In [6]:
import numpy as np # linear algebra 
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from sklearn.metrics import accuracy_score, classification_report
import torch
import matplotlib.pyplot as plt
import os

In [7]:
#In this cell, the datasets are downloaded via the KaggleAPI directly from the source. It might be necessary to authentificate first via Webbrowser to make this work.
#FUrthermore, a folder ../data is created, which is on the .gitignore list. In this folder, large files >100mb and the original datasets MITBIH and PTBDB are stored.

from kaggle.api.kaggle_api_extended import KaggleApi

#configuring and authentification with kaggle api. This could be configured so that a authentification mask is shown?
api = KaggleApi()
api.authenticate()

#Configuring the metadata for the ecg heartbeat data (original username etc)
dataset_owner = "shayanfazeli"
dataset_name = "heartbeat"

#Configuring a download path that is NOT in the current github repo (so the big files are not pushed and cause an error!) --> Links to filepaths have to be dynamically adjusted
download_path = "../data/KAGGLE_datasets" #In this case we use the data folder that is in the .gitignore list and therefore not pushed! To keep everything in one local repo.

# Download structure: First check if dataset is already downloaded, else download it and store it in download path (should be outside git repo!)
dataset_folder = os.path.join(download_path, dataset_name)
if not os.path.exists(dataset_folder):
    # Case 1: Dataset path is not created --> Create it and download datasets into it
    api.dataset_download_files(dataset_owner + "/" + dataset_name, path=download_path + "/" + dataset_name, unzip=True)
    print("Datasets are downloaded and unzipped.")
else:
    # Case 2: Folder is created, but datasets might be missing
    missing_files = [] 
    for file_name in ["mitbih_test.csv", "mitbih_train.csv", "ptbdb_abnormal.csv", "ptbdb_normal.csv"]:  # These are the hardcoded names of the datasets that should be downloaded.
        file_path = os.path.join(dataset_folder, file_name)
        if not os.path.exists(file_path):
            missing_files.append(file_name)

    if missing_files:
        # If the list contains missing files, download ALL files and overwrite the old folder.
        api.dataset_download_files(dataset_owner + "/" + dataset_name, path=download_path + "/" + dataset_name, unzip=True, force=True)
        print("Missing data was donwloaded and unzipped. All Datasets are now available.")
    else:
        print("All Datasets are already available.")

#Creating new variable that links to the datasets and can be used in the rest of the code.
path_to_datasets = download_path + "/" + dataset_name 

All Datasets are already available.


In [8]:
np.set_printoptions(precision=4)

In [9]:
# This cell now makes use of the downloadfolder for the datasets. If already available locally, the filepaths can be changed.
df_train= pd.read_csv(path_to_datasets + "/" + 'mitbih_train.csv', header=None)
df_test=pd.read_csv(path_to_datasets + "/" +  'mitbih_test.csv',header=None)
print("Dataframes MITBIH correctly read into workspace")

df_train = df_train.sample(frac=1, random_state=42).reset_index(drop=True)

#split target and value
train_target=df_train[187]
test_target=df_test[187]
train=df_train.drop(187,axis=1)
test=df_test.drop(187,axis=1)

Dataframes MITBIH correctly read into workspace


In [10]:
#Switches to decide the dataset sampling method and which models should be run
class Config_Sampling:
    oversample = False #equals to B_SMOTE
    undersample = False
    sample_name = "UNDEFINED_SAMPLE"
    
Train_Simple_ANN = True #Trains the simple ANN
Train_Simple_CNN = True #Trains the simple CNN
Train_Advanced_CNN = True #Trains the advanced CNN
 

In [11]:
oversampler = SMOTE()
undersampler = RandomUnderSampler()

In [12]:
#Based on the configuration in the Config_Sampling Class, the datasets are sampled and the sample name is modified accordingly
if Config_Sampling.oversample:
    train, train_target = oversampler.fit_resample(df_train.iloc[:,:-1], df_train.iloc[:,-1])
    Config_Sampling.sample_name = "MITBIH_B_SMOTE"
    print("Sample Name:", Config_Sampling.sample_name)
elif Config_Sampling.undersample:
    train, train_target = undersampler.fit_resample(df_train.iloc[:,:-1], df_train.iloc[:,-1])
    Config_Sampling.sample_name = "MITBIH_C_RUS"
    print("Sample Name:", Config_Sampling.sample_name)
else: 
    print("Using the original mitbih dataset")
    Config_Sampling.sample_name = "MITBIH_A_Original"
    print("Sample Name:", Config_Sampling.sample_name)

Using the original mitbih dataset
Sample Name: MITBIH_A_Original


## **Simple Artificial Neural Network**
ANN without convolutional layers. Only Dense layers are used. No Pooling, Flattening or Dropping out. Base model for later comparison.

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

Implement Torch Dataset object

In [14]:
class ECG_Dataset(Dataset):
    def __init__(self, csv_file, transform=None, target_transform=None):
        self.dataframe = csv_file.values
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.dataframe)
        #return self.dataframe.shape[0] # Alternative notation

    def __getitem__(self, idx):
        inputs = torch.tensor(self.dataframe[idx,:-1]).to(torch.float32)
        label = torch.tensor(self.dataframe[idx,-1]).long()

        return inputs, label

Custom function for preprocessing (to elaborate later, currently just returns the input itself)

In [43]:
class Lambda(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func

    def forward(self, x):
        return self.func(x)


def preprocess(x):
    return x * torch.Tensor([1.0])

In [44]:
# Define the ANN model
class SimpleANN(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.fc0 = nn.Sequential(Lambda(preprocess))
        self.fc1 = nn.Linear(input_size, 212)
        self.fc2 = nn.Linear(212, 150)  
        self.fc3 = nn.Linear(150, 60) 
        self.fc4 = nn.Linear(60, 24)  
        self.fc5 = nn.Linear(24, 12)  # Hidden to output layer
        self.fc6 = nn.Linear(12, output_size)  # Hidden to output layer
        self.relu = nn.LeakyReLU(negative_slope=0.001)    # Activation function
        self.dropout = nn.Dropout(p=0.1)
        self.sigmoid = nn.Sigmoid() 

    def forward(self, x):
        x = self.fc0(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.fc4(x)
        x = self.relu(x)
        x = self.fc5(x)
        x = self.relu(x)
        x = self.fc6(x)
        return x

In [28]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


In [29]:
def batch_loss_train(outputs, labels, loss_fn, optimizer):
    loss = loss_fn(outputs, labels)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    
    return loss.item()

In [30]:
def batch_loss_test(outputs, labels, loss_fn):
    loss = loss_fn(outputs, labels)    
    return loss.item()

In [31]:
def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad()    
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y) / len(X)
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    #test_loss /= num_batches
    correct /= size
    print(f"Test set => Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [32]:
def train_loop(dataloader, model, loss_fn, optimizer):
    model.train()
    train_loss = 0.0
    
    for inputs, labels in dataloader:
        
        # forward + backward + optimize
        outputs = model(inputs)
        train_loss += batch_loss_train(outputs,labels,loss_fn, optimizer) / len(inputs)

    print(f'Train loss: {train_loss}')
    

In [33]:
def get_data(train_ds, valid_ds, bs):
    return (
        DataLoader(train_ds, batch_size=bs, shuffle=True),
        DataLoader(valid_ds, batch_size=bs),
    )

In [34]:
def fit(epochs, model, loss_func, opt, train_dl, valid_dl):
    for t in range(epochs):  
        print(f"Epoch {t+1}   -------------------------------")
        train_loop(train_dl, model, criterion, optimizer)
        test_loop(test_dl, model, criterion)


In [45]:
# Define the model
input_size = 187  # Number of input features
output_size = 5  # Output size (e.g., regression or binary classification)
model = SimpleANN(input_size, output_size)

# Define loss and optimizer
criterion =  nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [46]:
train_ds = ECG_Dataset(df_train)
test_ds = ECG_Dataset(df_test)
train_dl, test_dl = get_data(train_ds, test_ds, 64)

In [48]:
fit(5, model, criterion, optimizer, train_dl, test_dl)

Epoch 1   -------------------------------
Train loss: 4.108393383503426
Test set => Accuracy: 95.4%, Avg loss: 1.103780 

Epoch 2   -------------------------------
Train loss: 3.7743748459615745
Test set => Accuracy: 95.5%, Avg loss: 1.017283 

Epoch 3   -------------------------------
Train loss: 3.628010864049429
Test set => Accuracy: 95.9%, Avg loss: 0.894511 

Epoch 4   -------------------------------
Train loss: 3.2846517644939013
Test set => Accuracy: 96.0%, Avg loss: 0.803386 

Epoch 5   -------------------------------
Train loss: 3.0611385184165556
Test set => Accuracy: 96.3%, Avg loss: 0.772160 

