## ECG Classification Using Convolutional Neural Networks

Author: Calvin Chan 

### Introduction
In our last notebook, we will use the concept of convolution to model our ECG timeseries data. Traditionally, image analysis uses a 2D convolution to extract features from images, however since we are working with signals this would not work. Instead of 2D convolutions, we can convolve over our signals using a 1D framework. 

### Table of Contents

Here, we import all the necessary packages used in this notebook.

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import torch 
import torch.nn as nn
import load_functions as f
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, roc_curve, roc_auc_score

import sys
sys.path.append('..')
from Notebooks import ecg_cleaning as c

### Data Import

Below we load our full data set.

In [2]:
# File Path
path = '../data/physionet.org/files/ptb-xl/1.0.3/'
metadata = pd.read_csv('../data/cleaned_metadata.csv')

# Import data
full_data = f.load_signal(path, metadata)

In [3]:
# Let's look at the data shape 
full_data[0].shape, full_data[1].shape

((15677, 1000, 12), (15677,))

We also want to look at Lead II only. 

In [4]:
# Slicing only for Lead II 
X = full_data[0][:,:,1]
y = full_data[1]

X.shape, y.shape

((15677, 1000), (15677,))

### Data Preprocessing 

In [5]:
# Transform target column into binary classes
y = y.apply(lambda x: f.binary(x))

In [6]:
# Sanity check
y.value_counts()

diagnostic_superclass
ABNO    8645
NORM    7032
Name: count, dtype: int64

In [7]:
# Instantiate LabelEncoder 
label = LabelEncoder()

# Fit binary classes
label.fit(y)

# Transform classes
y = label.transform(y)

In [8]:
# Checking how our label is encoded
label.transform(['ABNO', 'NORM'])

array([0, 1])

As we can see, signals that are labeled `ABNO` are mapped to `0` and `NORM` are mapped `1`. 

#### Signal denoising using ecg_cleaning.py

In this following section, we denoise our ECG signals using Fourier Tranforms. We have to specify the frequencies in which we want to take out for baseline wandering and powerline interference. 

In [9]:
X.shape

(15677, 1000)

In [10]:
# Low sampling frequency
sig_len = 1000
sampling_frequency = 100
time = np.arange(0, sig_len) / sampling_frequency

# Baseline and PLI removal
signal_bl = np.apply_along_axis(c.baseline_removal, axis=1, arr=X, freq_start=0.1, freq_stop=1.5)
signal_pli = np.apply_along_axis(c.high_freq_removal, axis=1, arr=signal_bl, freq_start=40)

In [11]:
signal_pli.shape

(15677, 1000)

#### Train Test Split

After preprocessing our signals, we can split our data set to training and testing. We will use `20%` of our data as testing and the rest as training. Since we have a data imbalance, we will include `stratify` as well.  

In [12]:
# Train/Test split
X_train, X_test, y_train, y_test = train_test_split(signal_pli, y, test_size=0.2, stratify=y)

# Checking the shape
X_train.shape, X_test.shape, y_train.shape, y_test.shape

((12541, 1000), (3136, 1000), (12541,), (3136,))

#### Scaling 

Next, we want to scale our data. Since we are using using a neural network, as we calculate the gradient during backpropagation, distances can affect how well this is performed. We will use `StandardScaler` for our case.

In [13]:
# Instantiate standard scaler
ss = StandardScaler()

# Fit and transform training set
X_train = ss.fit_transform(X_train)

# Transform testing set
X_test = ss.transform(X_test)

#### Transforming arrays into Torch Tensors

In [45]:
# Transforming independent variables
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)

# Transforming dependent variables
y_train = torch.tensor(y_train, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Sanity check
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

torch.Size([12541, 1000]) torch.Size([3136, 1000]) torch.Size([12541]) torch.Size([3136])


  X_train = torch.tensor(X_train, dtype=torch.float32)
  X_test = torch.tensor(X_test, dtype=torch.float32)
  y_train = torch.tensor(y_train, dtype=torch.float32)
  y_test = torch.tensor(y_test, dtype=torch.float32)


### Model Setup and Implementation

In [46]:
class SimpleCNN(nn.Module):
    """Starting off with a simple cnn."""
    def __init__(self):
        super(SimpleCNN, self).__init__()
        
        # Defining convolution layers
        self.conv_layer = nn.Sequential(
            
            # Convolution block 1
            nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=None),
            
            # Convolution block 2
            nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=None),
        )
        
        # Drop out layer 
        self.dropout = nn.Dropout1d(0.15)
        
        # Fully-connected layer
        self.fc_layer = nn.Sequential(
            nn.Linear(in_features=32*250, out_features=400),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=400, out_features=16),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=16, out_features=1),
        )
        
        # Output layer 
        self.sigmoid = nn.Sigmoid()

        # Loss function
        self.binary_entropy_loss = nn.BCELoss()
        self.optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        
    def forward(self, x): 
        """Perform forward pass."""

        # Pass through convolution layers
        x = self.conv_layer(x)
        
        # Flatten the output after convolution
        x = x.view(x.size(0), -1)
        
        # Apply drop out
        x = self.dropout(x)

        # Pass through fully-connected layer
        x = self.fc_layer(x)

        # If training, repeat, else, compute output layer
        if not self.training:
            x = self.sigmoid(x)

        return x

    def predict(self, x):
        """Make prediction."""

        predictions = self.forward(x)

        # Hard class prediction: output from sigmoid with the higher percentage
        hard_class_prediction = torch.argmax(predictions, dim=1)

        return hard_class_prediction


In [48]:
# Sample training 

sample = X_train[0]
sample_label = y_train[0]

print(sample.shape, sample_label.shape)

torch.Size([1000]) torch.Size([])


In [49]:
sample.unsqueeze(0).unsqueeze(0).shape

torch.Size([1, 1, 1000])

In [47]:
# Initializing the model

cnn_model = SimpleCNN()
cnn_model

SimpleCNN(
  (conv_layer): Sequential(
    (0): Conv1d(1, 16, kernel_size=(3,), stride=(1,), padding=(1,))
    (1): ReLU(inplace=True)
    (2): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv1d(16, 32, kernel_size=(3,), stride=(1,), padding=(1,))
    (4): ReLU(inplace=True)
    (5): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (dropout): Dropout1d(p=0.15, inplace=False)
  (fc_layer): Sequential(
    (0): Linear(in_features=8000, out_features=400, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=400, out_features=16, bias=True)
    (3): ReLU(inplace=True)
    (4): Linear(in_features=16, out_features=1, bias=True)
  )
  (sigmoid): Sigmoid()
  (binary_entropy_loss): BCELoss()
)

In [None]:
binary_entropy_loss = nn.

In [58]:
outputs = cnn_model()

In [61]:
loss = cnn_model.binary_entropy_loss(outputs.sigmoid(), sample_label.unsqueeze(0).unsqueeze(0))

In [62]:
loss.item()

0.7903474569320679