# Imports

In [1]:
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt

In [2]:
import importlib
import data_loading
import data_processing

importlib.reload(data_loading)
importlib.reload(data_processing)

from data_loading import load_dataset, create_input_space, augment_data
from data_processing import preprocess_signals, normalize_data

In [3]:
data = load_dataset()
print(len(data))
print(len(data[0]['signals']['ecg']))

8600
2816


# Preprocessing

In [4]:
data_filtered = preprocess_signals(data, 512, 256)
X, y = create_input_space(data_filtered)

(8600, 1408, 5)
(8600,)


In [5]:
num_signals = X.shape[2]  # 5 signals

for signal_idx in range(num_signals):
    global_max = np.max(X[:, :, signal_idx])  # Global max for this signal across all samples
    global_min = np.min(X[:, :, signal_idx])  # Global min for this signal across all samples

    print(f"Signal {signal_idx}: Global Max = {global_max}, Global Min = {global_min}")

Signal 0: Global Max = 4718.812958166116, Global Min = -2272.420349595169
Signal 1: Global Max = 18.741088911129566, Global Min = 0.5993188470456602
Signal 2: Global Max = 1602.2092519123194, Global Min = -1641.8367371633237
Signal 3: Global Max = 4170.338911821895, Global Min = -3100.581652393902
Signal 4: Global Max = 1855.0312371419202, Global Min = -1944.8529389968269


In [6]:
X = normalize_data(X)

In [7]:
num_signals = X.shape[2]  # 5 signals

for signal_idx in range(num_signals):
    global_max = np.max(X[:, :, signal_idx])  # Global max for this signal across all samples
    global_min = np.min(X[:, :, signal_idx])  # Global min for this signal across all samples

    print(f"Signal {signal_idx}: Global Max = {global_max}, Global Min = {global_min}")

Signal 0: Global Max = 1.0000000000000002, Global Min = 0.0
Signal 1: Global Max = 1.0000000000000284, Global Min = 0.0
Signal 2: Global Max = 1.0000000000000002, Global Min = 0.0
Signal 3: Global Max = 1.0000000000000002, Global Min = 0.0
Signal 4: Global Max = 1.0000000000000002, Global Min = 0.0


In [8]:
augmented_X, augmented_y = augment_data(X, y)

(77400, 1152, 5)
(77400,)


In [9]:
X_train, X_test, y_train, y_test = train_test_split(augmented_X, augmented_y, test_size=0.05, random_state=42)

In [10]:
print(X_train.shape)
print(X_test.shape)

(73530, 1152, 5)
(3870, 1152, 5)


# Transformer

In [11]:
num_classes = np.unique(y_train)

In [None]:
from sklearn.preprocessing import KBinsDiscretizer

# Quantization of ECG data
num_bins = 500  # Define the number of bins you'd like to use

# Initialize KBinsDiscretizer
kbin = KBinsDiscretizer(n_bins=num_bins, encode='ordinal', strategy='uniform')
kbin.fit(np.concatenate((X_train, X_test)))

X_train_tokenized = kbin.transform(X_train)
X_test_tokenized = kbin.transform(X_test)
import torch
import torch.nn as nn
from transformers import BertModel, BertTokenizer
from torch.utils.data import DataLoader, TensorDataset

# Initialize a pre-trained BERT model and tokenizer
model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
bert_model = BertModel.from_pretrained(model_name)


# Define a custom classifier model
class ECGClassifier(nn.Module):
    def __init__(self, num_classes):
        super(ECGClassifier, self).__init__()
        self.bert = bert_model
        self.dropout = nn.Dropout(0.1)
        self.fc = nn.Linear(768, num_classes)  # BERT hidden size is 768

    def forward(self, input_ids, attention_mask):
        _, pooled_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)
        return logits


# Convert tokenized data to PyTorch tensors
X_train_tokenized = torch.LongTensor(X_train_tokenized)
X_test_tokenized = torch.LongTensor(X_test_tokenized)
y_train = torch.LongTensor(y_train)
y_test = torch.LongTensor(y_test)

# Create DataLoader for training and testing data
train_data = TensorDataset(X_train_tokenized, y_train)
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
test_data = TensorDataset(X_test_tokenized, y_test)
test_loader = DataLoader(test_data, batch_size=32)

# Initialize the model
model = ECGClassifier(num_classes)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids, labels = batch
        optimizer.zero_grad()
        logits = model(input_ids, attention_mask=(input_ids != 0))  # Attention mask for BERT
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}")

# Evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for batch in test_loader:
        input_ids, labels = batch
        logits = model(input_ids, attention_mask=(input_ids != 0))
        predicted = torch.argmax(logits, dim=1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f"Test Accuracy: {accuracy:.2%}")
