In [3]:
import scipy.io
import torch
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.utils.rnn as rnn_utils
import os
import pandas as pd
import numpy as np

In [1]:

def load_mat_data(filename): 
    mat = scipy.io.loadmat(filename) 
    ecg_signal = mat['val'] 
    
    # Ensure the signal is 1D and take only the first channel if necessary 
    ecg_signal = ecg_signal.squeeze()  # Remove any single-dimensional entries 

    # Pad or truncate the signal to a fixed length (9000) 
    target_length = 9000 
    if ecg_signal.shape[0] < target_length: 
        # Pad the signal if it's shorter than the target length 
        padding = target_length - ecg_signal.shape[0] 
        ecg_signal = np.pad(ecg_signal, (0, padding), mode='constant', constant_values=0) 
    elif ecg_signal.shape[0] > target_length: 
        # Truncate the signal if it's longer than the target length 
        ecg_signal = ecg_signal[:target_length] 

    # Convert to a PyTorch tensor and add channel dimension 
    return torch.tensor(ecg_signal, dtype=torch.float32).unsqueeze(0)  # Shape: (1, target_length) 


In [4]:
class ECGDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        # Load the CSV file
        self.data_df = pd.read_csv(csv_file, header=None)
        self.labels_df = self.data_df[[0, 1]]  # Assuming first column is filename and second is label
        self.labels_df.columns = ['filename', 'label']  # Assign column names
        
        # Define a mapping from string labels to numeric labels
        self.label_mapping = {
            "A": 0,
            "N": 1,
            "O": 2,
            "~": 3
        }

        self.transform = transform

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        
        filename = os.path.join(mat_dir, self.labels_df.iloc[idx]['filename'])
        label = self.labels_df.iloc[idx]['label']
        
        # Convert string label to numeric using the mapping
        if isinstance(label, str):
            label = self.label_mapping.get(label, -1)  

        # Convert to tensor
        label_tensor = torch.tensor(label, dtype=torch.long)

        # Load ECG data
        data = load_mat_data(filename)  # Load the ECG data
        
        # Example of applying a transformation
        if self.transform:
            data = self.transform(data) 
        
        # Return the data and label
        return data, label_tensor

In [9]:
def collate_fn(batch):
    ecg_signals, labels = zip(*batch)

    lengths = torch.tensor([signal.size(1) for signal in ecg_signals])  

    padded_ecg_signals = rnn_utils.pad_sequence(ecg_signals, batch_first=True)

    labels = torch.stack(labels)  
    
    return padded_ecg_signals, lengths, labels

In [5]:
# CNN model
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=3, stride=1)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, stride=1)
        self.fc1 = nn.Linear(32 * (9000 - 4), 64)  
        self.fc2 = nn.Linear(64, 5)  

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(x.size(0), -1) 
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [20]:
# Training loop
def train_model(model, dataloader, criterion, optimizer, epochs=7):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for data, lengths, labels in dataloader: 
            optimizer.zero_grad()
            
            outputs = model(data)

            # Compute the loss
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Track statistics
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        accuracy = 100 * correct / total
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {running_loss:.4f}, Accuracy: {accuracy:.2f}%')

In [11]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [18]:
def test_model(model, dataloader, criterion):
    model.eval()  
    total_correct = 0
    total_loss = 0
    total_samples = 0

    with torch.no_grad():  
        for batch in dataloader:
            inputs = batch[0].to(device)  
            labels = batch[2].to(device)  

            # Forward pass
            outputs = model(inputs)

            # Calculate loss
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            # Get predicted classes
            _, predicted = torch.max(outputs, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)

    # Calculate accuracy
    accuracy = total_correct / total_samples if total_samples > 0 else 0
    average_loss = total_loss / len(dataloader) if len(dataloader) > 0 else 0

    print(f'Test Loss: {average_loss:.4f}, Test Accuracy: {accuracy * 100:.2f}%')


In [21]:
mat_dir = 'C:/Users/elkha/Desktop/EMA/EMA_173_2024-2025/DeepLearning/TP/RNN/Data/ECG_Data'  # Directory containing .mat files

# Load the complete dataset
label_csv = 'C:/Users/elkha/Desktop/EMA/EMA_173_2024-2025/DeepLearning/TP/RNN/Data/Labels.csv'
dataset = ECGDataset(label_csv)

# Set random seed for reproducibility
torch.manual_seed(42)

# Define the split sizes
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

# Split the dataset into training and testing datasets
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

# Create DataLoaders for training and testing
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

cnn_model = CNNModel()

criterion = nn.CrossEntropyLoss()  # Assuming classification task
optimizer_cnn = optim.Adam(cnn_model.parameters(), lr=0.001)



In [22]:
# Train CNN model
print("Training CNN model...")
train_model(cnn_model, train_dataloader, criterion, optimizer_cnn)

Training CNN model...
Epoch [1/7], Loss: 11821.5051, Accuracy: 46.86%
Epoch [2/7], Loss: 153.1832, Accuracy: 70.21%
Epoch [3/7], Loss: 66.9257, Accuracy: 88.41%
Epoch [4/7], Loss: 18.9102, Accuracy: 97.46%
Epoch [5/7], Loss: 7.0509, Accuracy: 99.60%
Epoch [6/7], Loss: 3.9035, Accuracy: 99.84%
Epoch [7/7], Loss: 1.3114, Accuracy: 99.96%


In [23]:
print("Testing CNN model...")
test_model(cnn_model, test_dataloader, criterion)

Testing CNN model...
Test Loss: 4.5140, Test Accuracy: 50.23%


: 