In [3]:
from scipy.io import loadmat

# Replace with your .mat file path
mat_file_path = r'D:\NYCU\Introduction to Artificial Intelligence\Final Project\Project\TrainingSet1\A0001.mat'
data = loadmat(mat_file_path)

# View the detailed structure of the ECG variable
ecg_data = data['ECG']
print("ECG structure type:", type(ecg_data))
print("ECG shape:", ecg_data.shape)
print("ECG data:", ecg_data)

ECG structure type: <class 'numpy.ndarray'>
ECG shape: (1, 1)
ECG data: [[(array(['Male'], dtype='<U4'), array([[74]], dtype=uint8), array([[ 0.0282288 ,  0.0392288 ,  0.0452288 , ...,  0.2582288 ,
           0.2592288 ,  0.2592288 ],
         [ 0.00672947,  0.01072947,  0.01472947, ...,  0.24772947,
           0.24872947,  0.24972947],
         [-0.02149933, -0.02849933, -0.03049933, ..., -0.01049933,
          -0.01049933, -0.00949933],
         ...,
         [-0.11200653, -0.11000653, -0.10800653, ...,  0.19399347,
           0.19399347,  0.19499347],
         [-0.5959572 , -0.5899572 , -0.5819572 , ...,  0.3070428 ,
           0.3070428 ,  0.3070428 ],
         [-0.01558507, -0.00658507,  0.00241493, ...,  0.21341493,
           0.21441493,  0.21441493]]))                                                                                       ]]


In [None]:
import numpy as np
import scipy.io as sio
import os
from tqdm import tqdm

def process_and_save_ecg_data(folder_path, save_path):
    max_length = 72000  # A uniform length of 72000

    for filename in tqdm(sorted(os.listdir(folder_path)), desc="Processing ECG data", mininterval=0.5):
        if filename.endswith('.mat'):
            file_path = os.path.join(folder_path, filename)
            mat_data = sio.loadmat(file_path)
            
            # Extract ECG signal data
            ecg_record = mat_data['ECG'][0][0][2]
            
            # Check if the number of leads is 12
            if ecg_record.shape[0] != 12:
                print(f"Warning: {filename} does not contain 12 leads")
                continue

            # Zero padding
            ecg_record_padded = np.pad(ecg_record, ((0, 0), (0, max_length - ecg_record.shape[1])), 'constant')

            # Save each processed file as `.npy` format
            save_filename = os.path.join(save_path, f"{os.path.splitext(filename)[0]}.npy")
            np.save(save_filename, ecg_record_padded)

# Replace with your folder path
folder_path = r'D:\NYCU\Introduction to Artificial Intelligence\Final Project\Project\TrainingSet1'
save_path = r'D:\NYCU\Introduction to Artificial Intelligence\Final Project\Project\Processed Data'

# Ensure the save folder exists
os.makedirs(save_path, exist_ok=True)

process_and_save_ecg_data(folder_path, save_path)

Processing ECG data: 100%|██████████| 2000/2000 [00:30<00:00, 66.02it/s]


In [8]:
import numpy as np
import os

npy_file_path = r'D:\NYCU\Introduction to Artificial Intelligence\Final Project\Project\Processed Data\A0001.npy'

# Load the .npy file
data = np.load(npy_file_path)

# Check the shape of the data
print("Data shape:", data.shape)

# View a portion of the data (first 10 data points)
print("Data sample:", data[:, :10])

Data shape: (12, 72000)
Data sample: [[ 2.82288000e-02  3.92288000e-02  4.52288000e-02  4.92288000e-02
   5.42288000e-02  5.62288000e-02  5.82288000e-02  6.02288000e-02
   6.02288000e-02  6.12288000e-02]
 [ 6.72946667e-03  1.07294667e-02  1.47294667e-02  1.67294667e-02
   1.97294667e-02  2.37294667e-02  2.87294667e-02  3.47294667e-02
   4.07294667e-02  4.77294667e-02]
 [-2.14993333e-02 -2.84993333e-02 -3.04993333e-02 -3.24993333e-02
  -3.44993333e-02 -3.24993333e-02 -2.94993333e-02 -2.54993333e-02
  -1.94993333e-02 -1.34993333e-02]
 [-1.74906667e-02 -2.44906667e-02 -2.94906667e-02 -3.24906667e-02
  -3.64906667e-02 -3.94906667e-02 -4.24906667e-02 -4.74906667e-02
  -5.04906667e-02 -5.44906667e-02]
 [ 2.43552000e-02  3.33552000e-02  3.83552000e-02  4.13552000e-02
   4.43552000e-02  4.43552000e-02  4.43552000e-02  4.23552000e-02
   3.93552000e-02  3.63552000e-02]
 [-6.80013333e-03 -8.80013333e-03 -6.80013333e-03 -6.80013333e-03
  -6.80013333e-03 -3.80013333e-03  1.99866667e-04  4.19986667e

In [9]:
import pandas as pd

# Read the label CSV
reference_path = r'D:\NYCU\Introduction to Artificial Intelligence\Final Project\Project\REFERENCE_1.csv'
labels_df = pd.read_csv(reference_path)

# Check the loaded data
print(labels_df.head())

  Recording  First_label  Second_label  Third_label
0     A0001            5           NaN          NaN
1     A0002            1           NaN          NaN
2     A0003            2           NaN          NaN
3     A0004            2           NaN          NaN
4     A0005            7           NaN          NaN


In [None]:
import pandas as pd
import os

# Read the label CSV
labels_df = pd.read_csv(reference_path)

# Set the folder path
folder_path = r'D:\NYCU\Introduction to Artificial Intelligence\Final Project\Project\Processed Data'

# Optimize file existence check by preloading existing .npy files
existing_files = {file_name for file_name in os.listdir(folder_path) if file_name.endswith('.npy')}
labels_df = labels_df[labels_df['Recording'].apply(lambda x: f"{x}.npy" in existing_files)]

# Check the filtered data
print(labels_df.head())
print(f"Total valid files: {len(labels_df)}")

  Recording  First_label  Second_label  Third_label
0     A0001            5           NaN          NaN
1     A0002            1           NaN          NaN
2     A0003            2           NaN          NaN
3     A0004            2           NaN          NaN
4     A0005            7           NaN          NaN
Total valid files: 2000


In [11]:
import torch

# 假設有 9 個類別
num_classes = 9
labels = labels_df['First_label'] - 1  # 將標籤從 0 開始
y = torch.nn.functional.one_hot(torch.tensor(labels.values), num_classes=num_classes)
print("One-hot encoded labels shape:", y.shape)

One-hot encoded labels shape: torch.Size([2000, 9])


In [12]:
import torch

# 載入數據並轉換為 PyTorch 張量
X = []
for recording_name in tqdm(labels_df['Recording'], desc="Loading files", mininterval=0.5):
    file_path = os.path.join(folder_path, f"{recording_name}.npy")
    if os.path.exists(file_path):
        X.append(torch.tensor(np.load(file_path).astype(np.float32)))

# 將列表中的所有張量堆疊成單一張量
X = torch.stack(X)
print("Data shape:", X.shape)

Loading files: 100%|██████████| 2000/2000 [00:21<00:00, 91.08it/s]


Data shape: torch.Size([2000, 12, 72000])


In [None]:
import torch
from sklearn.model_selection import train_test_split

# 假設 X 和 y 是 PyTorch 張量
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# 將分割出的資料轉換為 PyTorch 張量格式
X_train = X_train.clone().detach()
X_test = X_test.clone().detach()
y_train = y_train.clone().detach()
y_test = y_test.clone().detach()

print("Training data shape:", X_train.shape, y_train.shape)
print("Testing data shape:", X_test.shape, y_test.shape)

Training data shape: torch.Size([1400, 12, 72000]) torch.Size([1400, 9])
Testing data shape: torch.Size([600, 12, 72000]) torch.Size([600, 9])


In [None]:
from torch.utils.data import TensorDataset, DataLoader

# Package the data into PyTorch Datasets
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

# Use DataLoader for batch processing
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [22]:
X_train = X_train.clone().detach().float()
y_train = y_train.clone().detach().float()
X_test = X_test.clone().detach().float()
y_test = y_test.clone().detach().float()

In [None]:
import torch

# Check if a GPU is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("CUDA device available:", torch.cuda.get_device_name(0))
    # Enable cuDNN benchmark for faster training
    torch.backends.cudnn.benchmark = True
else:
    device = torch.device("cpu")
    print("Using CPU")

CUDA device available: NVIDIA GeForce RTX 3060 Laptop GPU


In [28]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AttentionWithContext(nn.Module):
    def __init__(self, input_dim, bias=True):
        super(AttentionWithContext, self).__init__()
        self.W = nn.Parameter(torch.Tensor(input_dim, input_dim))
        self.u = nn.Parameter(torch.Tensor(input_dim))
        self.bias = bias
        if bias:
            self.b = nn.Parameter(torch.Tensor(input_dim))
        self.init_parameters()

    def init_parameters(self):
        nn.init.xavier_uniform_(self.W)  # Initialize the weight matrix with Xavier uniform distribution
        nn.init.uniform_(self.u, -0.1, 0.1)  # Initialize the attention vector with uniform distribution
        if self.bias:
            nn.init.zeros_(self.b)  # Initialize the bias with zeros

    def forward(self, x, mask=None):
        # Apply linear transformation
        uit = torch.matmul(x, self.W)
        if self.bias:
            uit += self.b
        uit = torch.tanh(uit)
        
        # Calculate attention weights
        ait = torch.matmul(uit, self.u)
        a = torch.exp(ait)

        if mask is not None:
            a = a * mask.float()  # Apply mask

        # Normalize attention weights
        a = a / (torch.sum(a, dim=1, keepdim=True) + 1e-10)
        a = a.unsqueeze(-1)
        
        # Compute weighted sum of inputs
        weighted_input = x * a
        return torch.sum(weighted_input, dim=1)


In [36]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ECGModel(nn.Module):
    def __init__(self, input_channels=12, num_classes=9):
        super(ECGModel, self).__init__()
        
        # Define 5 convolutional layers, each with 12 filters
        self.conv_layers = nn.Sequential(
            nn.Conv1d(input_channels, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 24, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.3),
            nn.Dropout(0.2),
            
            nn.Conv1d(24, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 24, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.3),
            nn.Dropout(0.2),
            
            nn.Conv1d(24, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 24, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.3),
            nn.Dropout(0.2),
            
            nn.Conv1d(24, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 24, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.3),
            nn.Dropout(0.2),
            
            nn.Conv1d(24, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 12, kernel_size=3, padding=1),
            nn.LeakyReLU(0.3),
            nn.Conv1d(12, 48, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.3),
            nn.Dropout(0.2)
        )
        
        # Bidirectional GRU layer
        self.gru = nn.GRU(input_size=48, hidden_size=12, bidirectional=True, batch_first=True)
        
        # Attention layer
        self.attention = AttentionWithContext(input_dim=24)
        
        # Batch normalization layer
        self.batch_norm = nn.BatchNorm1d(24)
        
        # Output layer
        self.output_layer = nn.Sequential(
            nn.Linear(24, num_classes),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = x.permute(0, 2, 1)  # [batch, channels, seq_len] for Conv1d
        x = self.conv_layers(x)
        # print("After Conv Layers:", x.shape)  # Check shape

        x = x.permute(0, 2, 1)  # [batch, seq_len, channels] for GRU
        x, _ = self.gru(x)
        # print("After GRU Layer:", x.shape)  # Check shape

        x = self.attention(x)
        x = self.batch_norm(x)
        x = F.leaky_relu(x, 0.3)
        x = F.dropout(x, 0.5, training=self.training)

        x = self.output_layer(x)
        return x

In [34]:
# Define the model
model = ECGModel(input_channels=12, num_classes=9)

# Check if GPU is available and move the model to GPU
model.to(device)


ECGModel(
  (conv_layers): Sequential(
    (0): Conv1d(12, 12, kernel_size=(3,), stride=(1,), padding=(1,))
    (1): LeakyReLU(negative_slope=0.3)
    (2): Conv1d(12, 12, kernel_size=(3,), stride=(1,), padding=(1,))
    (3): LeakyReLU(negative_slope=0.3)
    (4): Conv1d(12, 24, kernel_size=(3,), stride=(2,), padding=(1,))
    (5): LeakyReLU(negative_slope=0.3)
    (6): Dropout(p=0.2, inplace=False)
    (7): Conv1d(24, 12, kernel_size=(3,), stride=(1,), padding=(1,))
    (8): LeakyReLU(negative_slope=0.3)
    (9): Conv1d(12, 12, kernel_size=(3,), stride=(1,), padding=(1,))
    (10): LeakyReLU(negative_slope=0.3)
    (11): Conv1d(12, 24, kernel_size=(3,), stride=(2,), padding=(1,))
    (12): LeakyReLU(negative_slope=0.3)
    (13): Dropout(p=0.2, inplace=False)
    (14): Conv1d(24, 12, kernel_size=(3,), stride=(1,), padding=(1,))
    (15): LeakyReLU(negative_slope=0.3)
    (16): Conv1d(12, 12, kernel_size=(3,), stride=(1,), padding=(1,))
    (17): LeakyReLU(negative_slope=0.3)
    (18): C

In [None]:
import torch.optim as optim

# Define the loss function and optimizer
criterion = nn.BCELoss()  # Binary Cross Entropy Loss, suitable for multi-label classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
from tqdm import tqdm

# Set training parameters
epochs = 100  # Adjust the number of epochs as needed
model.train()  # Set the model to training mode

for epoch in tqdm(range(epochs), desc="Training", leave=False):
    running_loss = 0.0  # Initialize cumulative loss
    for inputs, labels in train_loader:
        # Adjust the shape of the input data and convert to float type
        inputs = inputs.permute(0, 2, 1).float()  # Adjust shape to [batch_size, channels, seq_len]
        labels = labels.float()  # Convert labels to float type
        # Move data to GPU (if supported)
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass, compute loss, backward pass, and update weights
        optimizer.zero_grad()  # Clear previous gradients
        outputs = model(inputs)  # Forward pass
        loss = criterion(outputs, labels)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update model weights

        # Accumulate loss
        running_loss += loss.item()

    # Output average loss for each epoch
    print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}")


Epoch 1/100, Loss: 0.6999
Epoch 2/100, Loss: 0.7020
Epoch 3/100, Loss: 0.7003
Epoch 4/100, Loss: 0.7002
Epoch 5/100, Loss: 0.7012
Epoch 6/100, Loss: 0.7013
Epoch 7/100, Loss: 0.7004
Epoch 8/100, Loss: 0.7014
Epoch 9/100, Loss: 0.7012
Epoch 10/100, Loss: 0.7002
Epoch 11/100, Loss: 0.7020
Epoch 12/100, Loss: 0.7001
Epoch 13/100, Loss: 0.7010
Epoch 14/100, Loss: 0.7005
Epoch 15/100, Loss: 0.7013
Epoch 16/100, Loss: 0.7015
Epoch 17/100, Loss: 0.7017
Epoch 18/100, Loss: 0.7004
Epoch 19/100, Loss: 0.6995
Epoch 20/100, Loss: 0.7006
Epoch 21/100, Loss: 0.7010
Epoch 22/100, Loss: 0.7002
Epoch 23/100, Loss: 0.7017
Epoch 24/100, Loss: 0.7013
Epoch 25/100, Loss: 0.7007
Epoch 26/100, Loss: 0.7001
Epoch 27/100, Loss: 0.7004
Epoch 28/100, Loss: 0.6993
Epoch 29/100, Loss: 0.7005
Epoch 30/100, Loss: 0.7003
Epoch 31/100, Loss: 0.6997
Epoch 32/100, Loss: 0.7008
Epoch 33/100, Loss: 0.7006
Epoch 34/100, Loss: 0.7004
Epoch 35/100, Loss: 0.7021
Epoch 36/100, Loss: 0.6998
Epoch 37/100, Loss: 0.7013
Epoch 38/1

In [42]:
# Set the model to evaluation mode
model.eval()

# Initialize variables to record loss and accuracy
test_loss = 0.0
correct_predictions = 0
total_samples = 0

# Disable gradient calculation (evaluation mode does not require gradient computation)
with torch.no_grad():
    for inputs, labels in test_loader:
        # Adjust the shape of the input data and convert to float type
        inputs = inputs.permute(0, 2, 1).float()  # Adjust shape to [batch_size, channels, seq_len]
        labels = labels.float()  # Convert labels to float type
        # Move data to GPU (if supported)
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)  # Compute loss
        
        # Accumulate loss
        test_loss += loss.item()
        
        # For classification tasks, calculate accuracy
        _, predicted = torch.max(outputs, 1)  # Get the class with the highest probability
        correct_predictions += (predicted == labels.argmax(dim=1)).sum().item()
        total_samples += labels.size(0)

# Calculate average loss and accuracy
average_test_loss = test_loss / len(test_loader)
accuracy = correct_predictions / total_samples * 100

# Output results
print(f"Test Loss: {average_test_loss:.4f}")
print(f"Test Accuracy: {accuracy:.2f}%")

Test Loss: 0.7125
Test Accuracy: 11.50%
