In [43]:
import csv
import os

import numpy as np
import pandas as pd
import torch
from torch.nn.utils.rnn import pad_sequence

In [83]:
import pickle

with open('data/processed_train_dataset.pkl', 'rb') as f:
    result_data = pickle.load(f)

X = result_data['heartbeat']
y = result_data['y']

In [84]:
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
import torch
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

filtered_X_tensors = []
filtered_y_tensors = []

# Some entries in X have length 0 (for some reason?) so we filter them out
for i, heartbeat_sequence in enumerate(X):
    tensor = torch.tensor(heartbeat_sequence, dtype=torch.float32)
    if tensor.size(0) > 0:
        filtered_X_tensors.append(tensor)
        filtered_y_tensors.append(y[i])

# Scale data

# first we flatten
flattened_data = np.concatenate([tensor.numpy() for tensor in filtered_X_tensors], axis=0)
# fit the scaler
scaler = StandardScaler()
scaler.fit(flattened_data)
# apply the scaler to each separate tensor
for i in range(len(filtered_X_tensors)):
    data = filtered_X_tensors[i].numpy()
    data_scaled = scaler.transform(data)
    filtered_X_tensors[i] = torch.tensor(data_scaled, dtype=torch.float32)

# Padding for signals w/dfferent number of heartbeats
X_padded = pad_sequence(filtered_X_tensors, batch_first=True, padding_value=0.0)
# Mask of padded values
attention_mask = (X_padded != 0.0).any(dim=-1)

y_tensors = torch.tensor(filtered_y_tensors, dtype=torch.long)

X_train, X_val, y_train, y_val, attention_train, attention_val = train_test_split(
    X_padded, y_tensors, attention_mask, test_size=0.2, random_state=42
)

## Transformer architecture

In [85]:
# Simple dataset
class ECGDataset(Dataset):
    def __init__(self, signals, labels, masks):
        self.signals = signals
        self.labels = labels
        self.masks = masks

    def __len__(self):
        return len(self.signals)

    def __getitem__(self, idx):
        signal = self.signals[idx]
        label = self.labels[idx]
        mask = self.masks[idx]
        return signal, label, mask

### Sinusoidal positonal encoding
Work in progress, sinusoidal embedding is supposedly better for sequential data

In [86]:
import torch
import torch.nn as nn
import numpy as np

class SinusoidalPositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(SinusoidalPositionalEncoding, self).__init__()
        positional_encoding = torch.zeros(max_len, d_model)

        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(np.log(10000.0) / d_model))

        positional_encoding[:, 0::2] = torch.sin(position * div_term)
        positional_encoding[:, 1::2] = torch.cos(position * div_term)

        positional_encoding = positional_encoding.unsqueeze(0)

        self.register_buffer('positional_encoding', positional_encoding)

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.positional_encoding[:, :seq_len, :]

### Transformer layer
Custom transformer layer.
Like above, still a work in progress, not implemented into transformer fully (tried but it didn't work for some reason).

In [87]:
# Custom transformer layer arch.
# Currently not in use/work in progress
class CustomTransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super(CustomTransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True)
        # feedforward
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, d_model)
        )
        # normalization
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        # dropout
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, src_key_padding_mask):
        # Self-Attention with residual connection and LayerNorm
        attn_output, _ = self.self_attn(x, x, x, key_padding_mask=src_key_padding_mask)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)

        # feedforward with res. connection and norm
        ff_output = self.feed_forward(x)
        x = x + self.dropout(ff_output)
        x = self.norm2(x)

        return x

### Transformer architecture

In [88]:
import torch
import torch.nn as nn

# Transformer arch
class TransformerClassifier(nn.Module):
    def __init__(
        self, heartbeat_size, num_classes, d_model=64, num_heads=4, num_layers=2, ff_dim=256, dropout=0.1, max_seq_len=1000
    ):
        super(TransformerClassifier, self).__init__()
        self.embedding = nn.Linear(heartbeat_size, d_model)
        self.dropout = nn.Dropout(dropout)

        # Positional encoding
        # TODO: investigate other approaches?
        # self.positional_encoding = nn.Embedding(max_seq_len, d_model)
        self.positional_encoding = SinusoidalPositionalEncoding(d_model, max_len=max_seq_len)

        # Transformer encoder
        # TODO: when implemented, replace with custom encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=num_heads, dim_feedforward=ff_dim, dropout=dropout, batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # TODO: more layers?
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(d_model, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, num_classes)
        )

    def forward(self, x, mask):
        batch_size, seq_length, _ = x.size()

        #positions = torch.arange(0, seq_length, device=x.device).unsqueeze(0).expand(batch_size, seq_length)
        #x = self.embedding(x) + self.positional_encoding(positions)
        x = self.embedding(x)
        x = self.positional_encoding(x)
        x = self.dropout(x)

        # Remember the mask to ignore padded values
        x = self.transformer_encoder(x, src_key_padding_mask=~mask)

        # Pooling to reduce number of features (is this needed?)
        # TODO: possibly other poolings would be better
        x = self.mean_pooling(x, mask)
        # x = self.max_pooling(x, mask)

        return self.classifier(x)

    def mean_pooling(self, x, mask):
        mask = mask.unsqueeze(-1).float()
        x = x * mask
        sum_embeddings = x.sum(dim=1)
        sum_mask = mask.sum(dim=1)
        sum_mask = sum_mask + 1e-8  # avoid div by 0
        x = sum_embeddings / sum_mask
        return x

    def max_pooling(self, x, mask):
        mask = mask.unsqueeze(-1).expand_as(x).float()  # Shape: (batch_size, seq_length, d_model)
        x = x * mask
        x[mask == 0] = -1e9
        x, _ = torch.max(x, dim=1)
        
        return x


In [89]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, f1_score

# get device for training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

def train_epoch(model, dataloader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    all_preds = []
    all_labels = []

    for signals, labels, masks in dataloader:
        signals = signals.to(device)
        labels = labels.to(device)
        masks = masks.to(device)

        optimizer.zero_grad()

        outputs = model(signals, masks)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * signals.size(0)

        preds = outputs.argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = accuracy_score(all_labels, all_preds)
    return epoch_loss, epoch_acc

def validate_epoch(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for signals, labels, masks in dataloader:
            signals = signals.to(device)
            labels = labels.to(device)
            masks = masks.to(device)

            outputs = model(signals, masks)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * signals.size(0)

            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = accuracy_score(all_labels, all_preds)
    epoch_f1 = f1_score(all_labels, all_preds, average='micro')

    return epoch_loss, epoch_acc, epoch_f1

Using device: cuda


### Train and evaluation

In [90]:
train_dataset = ECGDataset(X_train, y_train, attention_train)
val_dataset = ECGDataset(X_val, y_val, attention_val)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [91]:
# init model
heartbeat_size = 180
num_classes = 4
model = TransformerClassifier(heartbeat_size, num_classes)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
# use reg. in optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

In [92]:
num_epochs = 50

for epoch in range(num_epochs):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
    val_loss, val_acc, val_f1 = validate_epoch(model, val_loader, criterion)

    print(f'Epoch {epoch+1}/{num_epochs}')
    print(f'Train Loss: {train_loss:.4f}  Train Acc: {train_acc:.4f}')
    print(f'Val   Loss: {val_loss:.4f}  Val   Acc: {val_acc:.4f}  Val F1: {val_f1:.4f}')

Epoch 1/50
Train Loss: 1.0628  Train Acc: 0.5591
Val   Loss: 1.0019  Val   Acc: 0.5765  Val F1: 0.5765
Epoch 2/50
Train Loss: 0.9170  Train Acc: 0.6197
Val   Loss: 0.9330  Val   Acc: 0.6098  Val F1: 0.6098
Epoch 3/50
Train Loss: 0.8776  Train Acc: 0.6329
Val   Loss: 0.8944  Val   Acc: 0.6255  Val F1: 0.6255
Epoch 4/50
Train Loss: 0.8478  Train Acc: 0.6385
Val   Loss: 0.8756  Val   Acc: 0.6343  Val F1: 0.6343
Epoch 5/50
Train Loss: 0.8269  Train Acc: 0.6412
Val   Loss: 0.8422  Val   Acc: 0.6412  Val F1: 0.6412
Epoch 6/50
Train Loss: 0.8083  Train Acc: 0.6508
Val   Loss: 0.8239  Val   Acc: 0.6402  Val F1: 0.6402
Epoch 7/50
Train Loss: 0.7896  Train Acc: 0.6606
Val   Loss: 0.8118  Val   Acc: 0.6402  Val F1: 0.6402
Epoch 8/50
Train Loss: 0.7780  Train Acc: 0.6648
Val   Loss: 0.8000  Val   Acc: 0.6471  Val F1: 0.6471
Epoch 9/50
Train Loss: 0.7623  Train Acc: 0.6677
Val   Loss: 0.8160  Val   Acc: 0.6441  Val F1: 0.6441
Epoch 10/50
Train Loss: 0.7505  Train Acc: 0.6729
Val   Loss: 0.8169  Val

## Run predictions on test data

In [104]:
import pickle

with open('data/processed_test_dataset.pkl', 'rb') as f:
    result_data = pickle.load(f)

X_test = result_data['heartbeat']
# should be 3411:
print(len(X_test))

3411


In [105]:
import torch
from torch.nn.utils.rnn import pad_sequence

# Assuming `X_test` is your list of heartbeats for the test set
# Convert `X_test` to PyTorch tensors
X_test_tensors = []

for heartbeat_sequence in X_test:
    tensor = torch.tensor(heartbeat_sequence, dtype=torch.float32)
    X_test_tensors.append(tensor)

# Apply the previously fitted scaler to each tensor in X_test
for i in range(len(X_test_tensors)):
    data = X_test_tensors[i].numpy()
    data_scaled = scaler.transform(data)  # Use the scaler fitted on the training data
    X_test_tensors[i] = torch.tensor(data_scaled, dtype=torch.float32)

# Pad all sequences to the length of the longest sequence in the dataset
X_test_padded = pad_sequence(X_test_tensors, batch_first=True, padding_value=0.0)

# Create an attention mask for `X_test`
attention_mask_test = (X_test_padded != 0.0).any(dim=-1)  # Mask for padded values

In [106]:
# Create Dataset class for the test data
class ECGTestDataset(Dataset):
    def __init__(self, signals, masks):
        self.signals = signals
        self.masks = masks

    def __len__(self):
        return len(self.signals)

    def __getitem__(self, idx):
        signal = self.signals[idx]
        mask = self.masks[idx]
        return signal, mask

# Create test dataset and DataLoader
test_dataset = ECGTestDataset(X_test_padded, attention_mask_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [107]:
# Put model in evaluation mode
model.eval()

all_preds = []

with torch.no_grad():
    for signals, masks in test_loader:
        signals = signals.to(device)
        masks = masks.to(device)

        # Forward pass through the model to get predictions
        outputs = model(signals, masks)

        # Get predicted class (highest value)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())

# Convert predictions to a numpy array
predictions = np.array(all_preds)

print("Predictions:", predictions)
# length should be 3411
print(len(predictions))

Predictions: [0 2 0 ... 0 0 1]
3411


In [108]:
# generate IDs for each signal in the test set
ids = np.arange(len(predictions))

# Create a DataFrame for submission
submission_df = pd.DataFrame({
    'id': ids,
    'y': predictions
})

# Save the DataFrame to a CSV file
submission_df.to_csv('./data/predictions/test_preds.csv', index=False)