In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import math

class PPGDataset(Dataset):
    def __init__(self, folder_path, signal_type='PPG', context_length=500, sample_rate=50):
        self.data = []
        self.labels = []
        af_path = os.path.join(folder_path, 'AF')
        for filename in os.listdir(af_path):
            if filename.endswith('.csv'):
                df = pd.read_csv(os.path.join(af_path, filename))
                signal = df[signal_type].values
                signal = self.resample_signal(signal, sample_rate) # Resample to 50 Hz
                signal = self.tokenize_signal(signal) # Tokenize the signal (0-100 range)
                windows = self.create_sliding_windows(signal, context_length) # Create sliding windows
                # Add labels (1 for AF)
                self.data.extend(windows)
                self.labels.extend([1] * len(windows))
        non_af_path = os.path.join(folder_path, 'Non_AF')
        for filename in os.listdir(non_af_path):
            if filename.endswith('.csv'):
                df = pd.read_csv(os.path.join(non_af_path, filename))
                signal = df[signal_type].values
                signal = self.resample_signal(signal, sample_rate) # Resample to 50 Hz
                signal = self.tokenize_signal(signal)  # Tokenize the signal (0-100 range)
                windows = self.create_sliding_windows(signal, context_length) # Create sliding windows
                # Add labels (0 for Non-AF)
                self.data.extend(windows)
                self.labels.extend([0] * len(windows))

    def resample_signal(self, signal, sample_rate):
        # Resample signal to specified rate (50 Hz for PPG)
        original_rate = 125  # original sampling rate
        step = original_rate // sample_rate
        return signal[::step]

    def tokenize_signal(self, signal):
        # Scale to 0-100 range and round to integer tokens
        signal_min, signal_max = signal.min(), signal.max()
        scaled_signal = (signal - signal_min) / (signal_max - signal_min) * 100
        return np.round(scaled_signal).astype(int)

    def create_sliding_windows(self, signal, context_length, stride=50):
        windows = []
        for i in range(0, len(signal) - context_length + 1, stride):
            windows.append(signal[i:i+context_length])
        return windows

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx], dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.float)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=500):
        super().__init__()
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0)]

class TransformerModel(nn.Module):
    def __init__(self, vocab_size=101, d_model=4, nhead=4, num_layers=2, dropout=0.2):
        super().__init__()

        # Token embedding
        self.token_embedding = nn.Embedding(vocab_size, d_model)

        # Positional encoding
        self.positional_encoding = PositionalEncoding(d_model)

        # Transformer encoder
        encoder_layers = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)

        # Classification head
        self.fc = nn.Linear(d_model, 1)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x shape: [batch_size, seq_len]
        x = self.token_embedding(x)  # [batch_size, seq_len, d_model]
        x = x.permute(1, 0, 2)  # [seq_len, batch_size, d_model]
        x = self.positional_encoding(x)

        # Transformer encoding
        x = self.transformer_encoder(x)

        # Take the last token and classify
        x = x[-1, :, :]  # [batch_size, d_model]
        x = self.fc(x)  # [batch_size, 1]

        # Squeeze the output to match the shape of the target
        return x.squeeze(-1)  # [batch_size,] instead of [batch_size, 1]

def train_model(dataset_path, epochs=5, batch_size=2, learning_rate=3e-4):
    # Create dataset
    full_dataset = PPGDataset(dataset_path)

    # Split into train and test sets
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    # Initialize model, loss, and optimizer
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = TransformerModel().to(device)
    criterion = nn.BCEWithLogitsLoss()  # BCEWithLogitsLoss already combines sigmoid and BCELoss
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

    # Training loop
    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)

            # Zero gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(batch_x)  # Outputs are now of shape [batch_size,]

            # Calculate the loss
            loss = criterion(outputs, batch_y)  # No need for .squeeze() here

            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            torch.cuda.empty_cache()

            total_loss += loss.item()

        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():
            for batch_x, batch_y in test_loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)

                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)  # No need for .squeeze() here
                val_loss += loss.item()

                # Calculate accuracy
                predicted = (outputs > 0.5).float()  # For binary classification, threshold at 0.5
                correct += (predicted == batch_y).sum().item()
                total += batch_y.size(0)

        # Print epoch statistics
        print(f'Epoch {epoch+1}/{epochs}')
        print(f'Training Loss: {total_loss/len(train_loader):.4f}')
        print(f'Validation Loss: {val_loss/len(test_loader):.4f}')
        print(f'Validation Accuracy: {100 * correct/total:.2f}%\n')

    return model

# Usage example
if __name__ == '__main__':
    dataset_path = 'Dataset'
    os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
    trained_model = train_model(dataset_path)

    # Optional: Save the model
    torch.save(trained_model.state_dict(), 'mimic_transformer_model.pth')
