In [4]:
import scipy.io
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import zipfile

# Const 

sequence_length = 18000
n_test = 5000
zip_file_path = './data/training2017.zip'

# Utils

def load_ecg(zip_file, file_name):
    with zip_file.open(file_name) as mat_file:
        mat = scipy.io.loadmat(mat_file)
        ecg_data = mat['val'].squeeze()
    return ecg_data

def standardize_series(series):
    mean = np.mean(series, axis=1, keepdims=True)
    std = np.std(series, axis=1, keepdims=True)
    return (series - mean) / std

scaler = StandardScaler()

def normalize_ecg(ecg_data):
    ecg_data = scaler.fit_transform(ecg_data.reshape(-1, 1)).flatten()
    return ecg_data

scaler2 = MinMaxScaler(feature_range=(0, 1))

def normalize_ecg_minmax(ecg_data):
    ecg_data = scaler2.fit_transform(ecg_data.reshape(-1, 1)).flatten()
    return ecg_data

def pad_or_trim_ecg(ecg_data, target_length):
    if len(ecg_data) > target_length:
        return ecg_data[:target_length]
    elif len(ecg_data) < target_length:
        return np.pad(ecg_data, (0, target_length - len(ecg_data)), 'constant')
    return ecg_data

class ECGDataset(Dataset):
    def __init__(self, ecg_data, labels):
        self.ecg_data = ecg_data
        self.labels = labels

    def __len__(self):
        return len(self.ecg_data)

    def __getitem__(self, idx):
        ecg = self.ecg_data[idx]
        label = self.labels[idx]
        return torch.tensor(ecg, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

class ECGCNN(nn.Module):
    def __init__(self):
        super(ECGCNN, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv1d(1, 16, kernel_size=7, stride=1, padding=3)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=5, stride=1, padding=2)
        self.conv3 = nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1)
        
        # Fully connected layers
        self.fc1 = nn.Linear(64 * 281, 64)
        self.fc2 = nn.Linear(64, 4)  # 4 classes de sortie

        # Pooling layer
        self.pool = nn.MaxPool1d(4)

    def forward(self, x):
       x = x.unsqueeze(1)
       x = self.pool(F.relu(self.conv1(x)))
       x = self.pool(F.relu(self.conv2(x)))
       x = self.pool(F.relu(self.conv3(x)))
       
       x = x.view(x.size(0), -1)
       x = F.relu(self.fc1(x))
       x = self.fc2(x)
       return x


In [5]:
# Prepare data

df_labels = pd.read_csv('./data/REFERENCE.csv', header=None, names=['file', 'label'])
label_mapping = {'N': 0, 'A': 1, 'O': 2, '~': 3}
df_labels['label'] = df_labels['label'].map(label_mapping)

ecg_files = df_labels['file'].values
ecg_data_list = []

with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
    for k in range(n_test):
        file_name = f'training2017/{ecg_files[k]}.mat' 
        ecg_data = load_ecg(zip_file, file_name)
        ecg_data_list.append(ecg_data)

assert(len(ecg_data_list) == n_test)

ecg_data_list = [normalize_ecg_minmax(pad_or_trim_ecg(ecg, sequence_length)) for ecg in ecg_data_list]

labels = df_labels['label'].values[:n_test]

In [6]:
# Training CNN

dataset = ECGDataset(ecg_data_list, labels)
ecg_train, ecg_test, labels_train, labels_test = train_test_split(ecg_data_list, labels, test_size=0.2, random_state=42)

batch_size = 32

train_dataset = ECGDataset(ecg_train, labels_train)
test_dataset = ECGDataset(ecg_test, labels_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

model = ECGCNN()

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
n_epochs = 5

for epoch in range(n_epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad() 
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{n_epochs}, Loss: {running_loss / len(train_loader)}")


Epoch 1/5, Loss: 1.0076143369674682
Epoch 2/5, Loss: 0.9856883358955383
Epoch 3/5, Loss: 0.9658912172317505
Epoch 4/5, Loss: 0.9400389642715454
Epoch 5/5, Loss: 0.9036868848800659


In [7]:
# Evaluation CNN

model.eval() 
correct = 0
total = 0

# On désactive le calcul du gradient pdt le test
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")

Test Accuracy: 61.40%


In [8]:
labels_rnn = df_labels['label'].values[:n_test]
factor=4

def downsample_ecg_rnn(ecg_data):
    return ecg_data[::factor]

ecg_data_list_rnn = [downsample_ecg_rnn(ecg) for ecg in ecg_data_list]
sequence_length_rnn = sequence_length // factor

ecg_data_list_rnn = [pad_or_trim_ecg(ecg, sequence_length_rnn) for ecg in ecg_data_list_rnn]
ecg_data_list_rnn = [normalize_ecg_minmax(ecg) for ecg in ecg_data_list_rnn]

ecg_train_rnn, ecg_test_rnn, labels_train_rnn, labels_test_rnn = train_test_split(
    ecg_data_list_rnn, labels_rnn, test_size=0.2, random_state=42
)

batch_size_rnn = 64

# Create datasets and data loaders for RNN
train_dataset_rnn = ECGDataset(ecg_train_rnn, labels_train_rnn)
test_dataset_rnn = ECGDataset(ecg_test_rnn, labels_test_rnn)

train_loader_rnn = DataLoader(train_dataset_rnn, batch_size=batch_size_rnn, shuffle=True)
test_loader_rnn = DataLoader(test_dataset_rnn, batch_size=batch_size_rnn, shuffle=False)


class ECGRNN(nn.Module):
    def __init__(self):
        super(ECGRNN, self).__init__()
        self.gru = nn.GRU(input_size=1, hidden_size=32, batch_first=True)
        
        # Fully connected layers
        self.fc1 = nn.Linear(32, 32) 
        self.fc2 = nn.Linear(32, 4)

    def forward(self, x):

        x = x.unsqueeze(-1)
        
        gru_out, hn = self.gru(x)
        
        out = gru_out[:, -1, :]
        
        # Fully connected layers
        out = F.relu(self.fc1(out))
        out = self.fc2(out)
        return out

# Training RNN

modelRNN = ECGRNN()

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(modelRNN.parameters(), lr=0.001)
n_epochs = 5

for epoch in range(n_epochs):
    running_loss = 0.0
    for i, (inputs, labels_rnn) in enumerate(train_loader_rnn):
        optimizer.zero_grad() 
        
        # Forward pass
        outputs = modelRNN(inputs)
        loss = criterion(outputs, labels_rnn)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{n_epochs}, Loss: {running_loss / len(train_loader_rnn)}")


Epoch 1/5, Loss: 1.1590149904054308
Epoch 2/5, Loss: 0.9960977633794149
Epoch 3/5, Loss: 0.999531868904356
Epoch 4/5, Loss: 0.9923074046770731
Epoch 5/5, Loss: 0.9916038248274062


In [9]:
# Evaluation RNN

modelRNN.eval() 
correct = 0
total = 0

# On désactive le calcul du gradient pendant la phase de test
with torch.no_grad():
    for inputs, labels_rnn in test_loader_rnn:
        outputs = modelRNN(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels_rnn.size(0)
        correct += (predicted == labels_rnn).sum().item()

accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")

Test Accuracy: 61.40%
