In [43]:
import os
import numpy as np
import matplotlib.pyplot as plt
import scipy.io
import torch
import torch.nn.functional as F
import tqdm
import random

from torch import nn
from einops import rearrange, repeat, pack, unpack
from einops.layers.torch import Rearrange
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder

from vit_pytorch.vit import Transformer

In [44]:
ecg_mat = scipy.io.loadmat('ECGData.mat')

In [45]:
ecg_data = ecg_mat['ECGData'][0][0][0]
print(ecg_data.shape)

(162, 65536)


In [46]:
labels = ecg_mat['ECGData'][0][0][1]
print(labels.shape)

(162, 1)


In [47]:
def split_dataset_indices(N, train_percent, val_percent, test_percent, seed=42):
    # Validate input percentages
    if train_percent + val_percent + test_percent != 100:
        raise ValueError("The sum of the percentages must be 100.")

    random.seed(seed)
    # Generate a list of indices
    indices = list(range(N))
    random.shuffle(indices)

    # Calculate split sizes
    train_size = int((train_percent / 100) * N)
    val_size = int((val_percent / 100) * N)

    # Split indices
    train_indices = indices[:train_size]
    val_indices = indices[train_size:train_size + val_size]
    test_indices = indices[train_size + val_size:]

    return train_indices, val_indices, test_indices

In [48]:
train_indices, val_indices, test_indices = split_dataset_indices(labels.shape[0], 70, 15, 15)

In [49]:
def create_windowed_dataset(data, labels, window_size=1024, stride=None):
    if not stride:
        stride = window_size

    min_val = np.min(data, axis=(0, 1), keepdims=True)
    max_val = np.max(data, axis=(0, 1), keepdims=True)
    data = 2 * ((data - min_val) / (max_val - min_val)) - 1
    
    num_samples, sample_size = data.shape
    num_windows = sample_size // window_size
    
    # Initialize the windowed data and label arrays
    windowed_data = np.empty((num_samples * num_windows, window_size))
    windowed_labels = np.empty((num_samples * num_windows, 1), dtype=labels.dtype)

    for i in range(num_samples):
        for j in range(num_windows):
            start = j * stride
            end = start + window_size
            windowed_data[i * num_windows + j, :] = data[i, start:end]
            windowed_labels[i * num_windows + j, :] = labels[i]

    labels_flattened = np.array([label[0][0] for label in windowed_labels])

    # Encode the string labels to integers
    label_encoder = LabelEncoder()
    labels_encoded = label_encoder.fit_transform(labels_flattened)

    return torch.tensor(np.expand_dims(windowed_data, 1), dtype=torch.float64), torch.tensor(labels_encoded, dtype=torch.float64)

In [50]:
ecg_train, labels_train = create_windowed_dataset(ecg_data[train_indices], labels[train_indices], window_size=256)
ecg_val, labels_val = create_windowed_dataset(ecg_data[val_indices], labels[val_indices], window_size=256)
ecg_test, labels_test = create_windowed_dataset(ecg_data[test_indices], labels[test_indices], window_size=256)

In [51]:
ecg_train.shape

torch.Size([28928, 1, 256])

In [52]:
labels_train.shape

torch.Size([28928])

In [53]:
train = { 'samples': ecg_train, 'labels': labels_train }
val = { 'samples': ecg_val, 'labels': labels_val }
test = { 'samples': ecg_test, 'labels': labels_test }

In [54]:
output_path = 'TimeMAE/data/ECG/'
os.makedirs(output_path, exist_ok=True)

In [55]:
torch.save(train, os.path.join(output_path, 'train.pt'))
torch.save(val, os.path.join(output_path, 'val.pt'))
torch.save(test, os.path.join(output_path, 'test.pt'))