In [3]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import yaml
import math
from tqdm import tqdm, trange
import os

import matplotlib.pyplot as plt
import seaborn as sns   

import copy
from tqdm import tqdm

from sklearn.metrics import f1_score, balanced_accuracy_score, confusion_matrix, ConfusionMatrixDisplay, accuracy_score
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split

from torch.utils.tensorboard import SummaryWriter
from imblearn.over_sampling import SMOTE

import warnings
from utils import PTBDataset, test
from torch import Tensor

pd.set_option('display.max_columns', None)

ModuleNotFoundError: No module named 'utils'

In [17]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [1]:
path=r'C:\Users\pauls\Desktop\Studium\Machine Learning for Health Care\Projekt 2\project2_TS_input'

In [5]:
# Load data
train_df = pd.read_csv(path+'\ptbdb_train.csv', header=None)
test_df = pd.read_csv(path+'\ptbdb_test.csv', header=None)

In [6]:
X_train_full = train_df.iloc[:, :-1].to_numpy()
y_train_full = train_df.iloc[:, -1].to_numpy()

X_test = test_df.iloc[:, :-1].to_numpy()
y_test = test_df.iloc[:, -1].to_numpy()

X_train_full = np.c_[X_train_full, np.zeros((X_train_full.shape[0], 3))]
X_test = np.c_[X_test, np.zeros((X_test.shape[0], 3))]

In [7]:
X_train, X_val, y_train, y_val = train_test_split(
    X_train_full, y_train_full, test_size=0.2, stratify=y_train_full, random_state=42
)

In [111]:
# We got this function from IML

from torch.utils.data import DataLoader, TensorDataset

def create_loader_from_np(X, y = None, train = True, batch_size=32, shuffle=True, num_workers = 4):
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.

    input: X: numpy array, the features
           y: numpy array, the labels
    
    output: loader: torch.data.util.DataLoader, the object containing the data
    """
    if train:
        # Attention: If you get type errors you can modify the type of the
        # labels here
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float), 
                                torch.from_numpy(y).type(torch.long))
    else:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers)
    return loader

In [114]:
train_loader = create_loader_from_np(X_train, y_train, train = True, batch_size=32)
val_loader = create_loader_from_np(X_val, y_val, train = True, shuffle = False, batch_size=32)
test_loader = create_loader_from_np(X_test, y_test, train = False, shuffle = False, batch_size=32)

# Dependencies that I need because I can't import the utils file. Note: I made changes in train_one_epoch, validate_one_epoch and test_one_epoch

In [160]:
# This script contains all the utility functions for the Transformer model.

# =========================================================================== #
#                              Packages and Presets                           #
# =========================================================================== #

import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from torch.nn.parameter import Parameter
from tqdm import trange, tqdm
from sklearn.metrics import f1_score, balanced_accuracy_score, accuracy_score
import copy
import os

# =========================================================================== #
#                 Data Loading and Preprocessing Utilities                    #
# =========================================================================== #
class PTB_Dataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32).unsqueeze(2)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = self.X[idx]
        y = self.y[idx]

        # Check if the input tensor is 3D, if so, add a batch dimension
        if x.dim() == 2:
            x = x.unsqueeze(0)
            
        return x, y

# =========================================================================== #
#                             General Utilities                               #                
# =========================================================================== #
# see https://vandurajan91.medium.com/random-seeds-and-reproducible-results-in-pytorch-211620301eba
# for more information aboutreproducibility in pytorch
def set_all_seeds(seed: int):
    """Set all possible seeds to ensure reproducibility and to avoid randomness
    involved in GPU computations.

    Args:
        seed (int): Seed
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
#!!! This Early Stopping class is inspired by the following stackoverflow post:
#!!! https://stackoverflow.com/questions/71998978/early-stopping-in-pytorch
#!!! and by this kaggle notebook:
#!!! https://www.kaggle.com/code/megazotya/ecg-transformer/notebook
class EarlyStopping:
    def __init__(
        self, 
        start: int = 50,
        patience: int = 10,
        epsilon: float = 1e-6,
        verbose: bool = False,
        mode: str = "min"
    ):
        self.start = start
        self.counter = 0
        self.patience = patience
        self.epsilon = epsilon
        
        self.verbose = verbose
        self.mode = mode
        
        # initialize objects of which best value will be tracked
        self.best_model = nn.Identity
        self.best_epoch = 0
        self.best_score = np.inf if mode == "min" else -np.inf
        
        
    def early_stop(self, model: nn.Module, metric: float, current_epoch: int) -> bool:
        """Whether training should be stopped or not. If there was no improvement
        in a long time, training should be stopped. Continuously saves the best
        model.

        Args:
            val_loss (float): Current validation loss
            model (nn.Module): Current model
            current_epoch (int): Current epoch number

        Returns:
            bool: Whether training should be stopped or not
        """
        # check whether improvement was large enough (if there was any at all)
        if (
            (metric < self.best_score + self.epsilon and self.mode == "min") or
            (metric > self.best_score - self.epsilon and self.mode == "max")
        ):
            # reset number of epochs without improvement
            self.counter = 0
            # update best model and best score
            self.best_model = copy.deepcopy(model.state_dict())
            self.best_score = metric
            self.best_epoch = current_epoch
            
        elif current_epoch > self.start:
            self.counter += 1  # stop training if no improvement in a long time
            if self.counter >= self.patience:
                if self.verbose:
                    print(f"Early stopping at epoch {current_epoch}. Best score was {self.best_score:.4f} in epoch {self.best_epoch}.")
                return True
        return False
    
    def save_best_model(self, model_path:str) -> None:
        os.makedirs(os.path.dirname(model_path), exist_ok=True)
        torch.save(self.best_model, model_path)
        
    def get_best_model(self) -> nn.Module:
        model = torch.load_state_dict(self.best_model)
        return model


# =========================================================================== #
#                        Transformer Specific Utilities                       #
# =========================================================================== #
# Custom weight initialization:
#!!! Copied from https://www.kaggle.com/code/megazotya/ecg-transformer/notebook
def init_parameters(model):
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)


# =========================================================================== #
#                    Model Training and Evaluation                            #
# =========================================================================== #
def train_one_epoch(
    model:nn.Module,
    optimizer:optim.Optimizer,
    criterion:nn.Module,
    train_loader:DataLoader,
    device:torch.device
) -> tuple[float, float, float, float]:
    """Train the model for one epoch

    Args:
        model (nn.Module): Model to train
        optimizer (optim.Optimizer): Optimizer to use
        criterion (nn.Module): Loss function
        train_loader (DataLoader): Train data loader
        device (torch.device): Device on which calculations are executed

    Returns:
        tuple[float, float, float, float]: Tuple of train loss, accuracy, 
            balanced accuracy and f1 score
    """
    
    model.train()
    
    total_loss = 0.0
    y_preds = []
    y_true = []
    for seq, label in train_loader:
        seq, label = seq.to(device), label.to(device)
        optimizer.zero_grad()
        
        #! Note Paul: I added this in order for the model not to give me any dimension issues!
        seq=seq.squeeze(1)
        
        output = model(seq)
        loss = criterion(output, label)
        loss.backward()
        
        optimizer.step()
        
        total_loss += loss.item()
        y_true.extend(label.cpu().numpy())
        y_preds.extend(F.softmax(output, dim=1).argmax(dim=1).cpu().numpy())

    
    # calculate metrics
    train_loss = total_loss / len(train_loader.dataset)
    train_acc = accuracy_score(y_true, y_preds)
    train_balanced_acc = balanced_accuracy_score(y_true, y_preds)
    train_f1_score = f1_score(y_true, y_preds)
    return train_loss, train_acc, train_balanced_acc, train_f1_score

def validate_one_epoch(
    model:nn.Module,
    criterion:nn.Module,
    val_loader:DataLoader,
    device:torch.device
) -> tuple[float, float, float, float]:
    """Validate the model for one epoch

    Args:
        model (nn.Module): Model to validate
        criterion (nn.Module): Loss function to use
        val_loader (DataLoader): Validation data loader
        device (torch.device): Device on which calculations are executed

    Returns:
        tuple[float, float, float, float]: tuple of validation loss, accuracy,
            balanced accuracy and f1 score
    """
    
    model.eval()
    
    total_loss = 0
    y_preds = []
    y_true = []
    with torch.no_grad():
        for seq, label in val_loader:
            seq, label = seq.to(device), label.to(device)
            
            #! Note Paul: I added this in order for the model not to give me any dimension issues!
            seq=seq.squeeze(1)
            
            output = model(seq)
            
            loss = criterion(output, label)
            total_loss += loss.item()
            
            y_true.extend(label.cpu().numpy())
            y_preds.extend(F.softmax(output, dim=1).argmax(dim=1).cpu().numpy())

    
    # calculate metrics
    val_loss = total_loss / len(val_loader.dataset)
    val_acc = accuracy_score(y_true, y_preds)
    val_balanced_acc = balanced_accuracy_score(y_true, y_preds)
    val_f1_score = f1_score(y_true, y_preds)
    return val_loss, val_acc, val_balanced_acc, val_f1_score

def train_and_validate(
    model:nn.Module,
    optimizer:optim.Optimizer,
    scheduler:optim.lr_scheduler,
    criterion:nn.Module,
    train_loader:DataLoader,
    val_loader:DataLoader,
    best_model_path:str="weights/transformer_pe.pth",
    device:torch.device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
    num_epochs:int=100,
    ES:EarlyStopping=None,
    summary_writer:SummaryWriter=None
):
    # make fancy progress bar
    with trange(num_epochs) as t:
        for epoch in t:
            dct = {}
            train_loss, train_acc, train_balanced_acc, train_f1_score = train_one_epoch(model, optimizer, criterion, train_loader, device)
            val_loss, val_acc, val_balanced_acc, val_f1_score = validate_one_epoch(model, criterion, val_loader, device)
            
            # update progress bar
            t.set_description(f"Training Transformer")
            t.set_postfix(
                train_loss=train_loss,
                val_loss=val_loss,
                train_balanced_acc=train_balanced_acc,
                val_balanced_acc=val_balanced_acc
            )
            
            
            # reduce learning rate
            if scheduler is not None:
                scheduler.step(val_loss)
                
            # log metrics to tensorboard
            if summary_writer is not None:
                summary_writer.add_scalar("Loss train", train_loss, epoch)
                summary_writer.add_scalar("Loss val", val_loss, epoch)
                summary_writer.add_scalar("Accuracy train", train_acc, epoch)
                summary_writer.add_scalar("Accuracy val", val_acc, epoch)
                summary_writer.add_scalar("Balanced accuracy train", train_balanced_acc, epoch)
                summary_writer.add_scalar("Balanced accuracy val", val_balanced_acc, epoch)
                summary_writer.add_scalar("F1 score train", train_f1_score, epoch)
                summary_writer.add_scalar("F1 score val", val_f1_score, epoch)

            if ES.early_stop(model, val_balanced_acc, epoch):
                break
    ES.save_best_model(best_model_path)
    return model

def test(
    model:nn.Module,
    criterion:nn.Module,
    test_loader:DataLoader,
    device:torch.device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
) -> tuple[float, float, float, float]:
    """Test the model

    Args:
        model (nn.Module): Model to test
        criterion (nn.Module): Loss function to use
        test_loader (DataLoader): Test data loader
        device (torch.device, optional): Device on which calculations are executed. Defaults to torch.device("cuda:0" if torch.cuda.is_available() else "cpu").

    Returns:
        tuple[float, float, float, float]: Tuple of predicted probabilities, 
            predicted labels, true labels and test loss
    """
    model.eval()
    
    total_loss = 0
    model_probs = []
    y_preds = []
    y_true = []
    with torch.no_grad():
        for seq, label in test_loader:
            seq, label = seq.to(device), label.to(device)
            
            #! Note Paul: I added this in order for the model not to give me any dimension issues!
            seq=seq.squeeze(1)
            output = model(seq)
            
            loss = criterion(output, label)
            total_loss += loss.item()
            
            model_probs.extend(F.softmax(output, dim=1)[:, 1].cpu().numpy())
            y_true.extend(label.cpu().numpy())
            y_preds.extend(F.softmax(output, dim=1).argmax(dim=1).cpu().numpy())

    
    # calculate metrics
    test_loss = total_loss / len(test_loader.dataset)
    return model_probs, y_preds, y_true, test_loss

# The CNN:

In [162]:
# We based our CNN on this: https://medium.com/@chen-yu/building-a-customized-residual-cnn-with-pytorch-471810e894ed

class CNN(nn.Module):
    def __init__(self, classes_num: int, in_channels: int):
        super().__init__()

        # Initial convolution layer
        self.conv = nn.Conv1d(
            in_channels=in_channels, 
            out_channels=16, 
            #kernel_size=(3, 3), 
            kernel_size=3,
            padding='same', 
            bias=False
        )
        self.bn = nn.BatchNorm1d(16)
        self.relu = nn.ReLU()

        #Out features from the 4 blocks of residual layers: 128
        
        
        # Flattening and final linear layer
        self.flatten = nn.Flatten(1)
        self.dropout = nn.Dropout(p=0.2)
        self.linear = nn.Linear(
            in_features=16,
            out_features=classes_num
        )

    def forward(self, x: Tensor) -> Tensor:
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)

        #print("Shape after convolution and activation:", x.shape)

        x = self.flatten(x)
        #print("Shape after flattening:", x.shape)
        x = self.dropout(x)
        x = self.linear(x)
        #print("Shape after linear layer:", x.shape)
        return x
    
model=CNN(classes_num=2, in_channels=190)

In [92]:
model = model.to(DEVICE)
init_parameters(model)

optimizer = optim.AdamW(model.parameters(), lr=0.005, weight_decay =0.0001)
criterion = torch.nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=5, threshold=1e-06, verbose = 1)
#scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma = 0.95)
early_stopping = EarlyStopping(patience = 20, verbose = 1, mode = 'max')
summary_writer = SummaryWriter(log_dir="logs/transformer_pe")

In [165]:
N_EPOCHS=100

model = train_and_validate(
    model = model,
    optimizer = optimizer,
    scheduler = scheduler,
    criterion = criterion,
    train_loader = train_loader,
    val_loader = val_loader,
    best_model_path = path+'cnn_best_model.pth',
    device = DEVICE,
    num_epochs = N_EPOCHS,
    ES = early_stopping,
    summary_writer = summary_writer
)

Training Transformer:   2%|▏         | 2/100 [00:03<02:47,  1.71s/it, train_balanced_acc=0.503, train_loss=0.0277, val_balanced_acc=0.49, val_loss=0.0279] 


KeyboardInterrupt: 