# ECG Prediction using PPG signals

In [None]:
import os
import sys
import torch
from torch import nn
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# Add the parent directory (i.e. transformer, means parent directory of 'scripts' and 'notebooks') to sys.path
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(project_root)

# Import the function
from scripts.basic_functions import *

In [None]:
## Data Loader
def data_loader(subject, action):
    '''
    Automate input reading: select subject, action
    Read in csv file
    '''
    df_data_ppg = pd.read_csv(
        '../data/Finger/csv/s'+ str(subject) + '_' + str(action) + '.csv',
        sep=',',           # specify delimiter (default is ',')
        header=0,          # row number to use as column names (0 means the first row)
        na_values=['NA', ''],  # specify which values should be considered NaN
    )
    return df_data_ppg

In [None]:
# Sample data and sampling frequency
fs = 500  

# Define bandpass range for PPG 
lowcut = 0.4
highcut = 10

df_data = data_loader(subject=10, action='sit')
df_ecg = df_data.iloc[:,[1]]
df_ecg['ecg'] = bandpass_filter(df_ecg['ecg'], lowcut, highcut, fs, order=4)
print(df_ecg)
df_ppg = df_data.iloc[:,[6,7,8]]
print(df_ppg)
df_ppg['pleth_4'] = bandpass_filter(df_data['pleth_4'], lowcut, highcut, fs, order=4)
df_ppg['pleth_5'] = bandpass_filter(df_data['pleth_5'], lowcut, highcut, fs, order=4)
df_ppg['pleth_6'] = bandpass_filter(df_data['pleth_6'], lowcut, highcut, fs, order=4)
df_ppg

In [None]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
ecg_tensor = torch.tensor(df_ecg['ecg'].values).float().unsqueeze(-1)

In [None]:
# Normalize ppg signals
scaler = MinMaxScaler()
ppg_normalized = scaler.fit_transform(df_ppg)
ppg_tensor = torch.tensor(ppg_normalized, dtype=torch.float32)
ppg_tensor

In [None]:
# Create sequences and slice ecg_tensor
subset = 1099
target_sequence = ecg_tensor[:subset-99]

def create_sequences(ppg_data, ecg_data, sequence_length):
    sequences_ppg = []
    sequences_ecg = []
    for i in range(len(ppg_data) - sequence_length + 1):
        seq_ppg = ppg_data[i:i + sequence_length]
        seq_ecg = ecg_data[i:i + sequence_length]
        sequences_ppg.append(seq_ppg)
        sequences_ecg.append(seq_ecg)
    return torch.stack(sequences_ppg), torch.stack(sequences_ecg)

sequence_length = 100  # Example: sequences_ppg of 100 timesteps
input_sequences, target_sequence = create_sequences(ppg_tensor[:subset], ecg_tensor[:subset], sequence_length)

print(input_sequences.size())
print(target_sequence.size())

In [None]:
x_data = input_sequences  # Shape (1000, 100, 3) (PPG data)
y_data = target_sequence  # Shape (1000, 1) (ECG data)

# Check if the shape of both is aligned
assert x_data.shape[0] == y_data.shape[0], "Number of samples do not match!"

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len = 100):
        super().__init__()  # new version of: super(PositionalEncodingLayer, self).__init__()
        self.d_model = d_model
        self.max_len = max_len

        pe = torch.zeros(max_len, d_model)
        print("Shape of pe:", pe.size())
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # Shape: [max_len, 1], Arange: Returns a 1-D tensor from start to stop, Unsqueeze: Returns a new tensor with a dimension of size one inserted at the specified position
        print("Shape of position:", position.size())
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) # Shape: [d_model // 2]
        print("Shape of div_term", div_term.size())
        
        # Expand div_term to match the shape of position
        div_term = div_term.unsqueeze(0)  # Shape: [1, d_model // 2]
        div_term = div_term.expand(max_len, -1)  # Shape: [max_len, d_model // 2]

        # Make sure div_term is of shape [max_len, d_model] to broadcast properly
        div_term_full = torch.zeros(max_len, d_model)
        div_term_full[:, 0::2] = div_term  # Fill every other column with div_term
        print("Corrected shape of div_term", div_term.size())

        pe[:, 0::2] = torch.sin(position * div_term_full[:, 0::2])  # Sine for even indices
        pe[:, 1::2] = torch.cos(position * div_term_full[:, 1::2])  # Cosine for odd indices
        # pe = pe.unsqueeze(0)
        # x = x + pe[:, :x.size(1)]
        
        self.pe = pe.unsqueeze(0)  # Shape: [1, max_len, d_model]
        
    def forward(self, x):
        # Add positional encoding to input tensor
        x = x + self.pe[:, :x.size(1), :]
        return x



In [None]:
d_model = input_sequences.shape[2]  # Number of features
positional_encoding = PositionalEncoding(d_model)
pe_input_seq = positional_encoding(input_sequences)
print("Output shape:", pe_input_seq.size())
print("sample of pe:", pe_input_seq[0])

In [None]:
class TransformerTimeSeries(nn.Module):
    def __init__(self, input_dim, output_dim, num_layers=4, d_model=32, nhead=4, dim_feedforward=128, dropout=0.1):
        super(TransformerTimeSeries, self).__init__()
        self.embedding = nn.Linear(input_dim, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc_out = nn.Linear(d_model, output_dim)

    def forward(self, src):
        src = self.embedding(src) * torch.sqrt(torch.tensor(src.size(-1), dtype=torch.float32))
        src = self.positional_encoding(src)
        output = self.transformer_encoder(src)
        output = self.fc_out(output)
        return output

In [None]:
# Split ratio 
train_ratio = 0.8
train_size = int(train_ratio * x_data.size(0))  # Number of training samples
val_size = x_data.size(0) - train_size          # Number of validation samples

# Option 1: Manual slicing (if shuffle is not needed)
X_train, X_val = x_data[:train_size], x_data[train_size:]
y_train, y_val = y_data[:train_size], y_data[train_size:]

# Print shapes for verification
print(f"x_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"x_val shape: {X_val.shape}, y_val shape: {y_val.shape}")


In [None]:
# Model initialization 
d_model = 32  # Embedding dimension
input_dim = 3  # 3 PPG signals (red, green, IR)
output_dim = 1  # 1 ECG target per time step
nhead = 8  # Attention heads
num_layers = 4  # Number of transformer layers
batch_size = 32  # Batch size


model = TransformerTimeSeries(input_dim=input_dim, output_dim=output_dim, d_model=d_model, nhead=nhead, num_layers=num_layers) # Replace 1 with your output features
output = model(pe_input_seq)

# Loss function: Mean Squared Error for regression tasks
loss_fn = nn.MSELoss()

# Optimizer: Adam optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 10  # Number of epochs to train

# Training loop
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    
    # Initialize running loss
    running_loss = 0.0

    # Iterate through the training data in batches
    for i in range(0, len(X_train), batch_size):
        # Get the current batch
        batch_X = X_train[i:i+batch_size]
        batch_y = y_train[i:i+batch_size]
        
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass through the model
        predictions = model(batch_X)

        # Calculate loss (MSE between predicted ECG and actual ECG)
        loss = loss_fn(predictions, batch_y)

        # Backward pass (compute gradients)
        loss.backward()

        # Update the weights
        optimizer.step()

        # Update running loss
        running_loss += loss.item()

    # Calculate the average loss for the epoch
    avg_loss = running_loss / len(X_train)
    
    # Validation metrics (optional but useful)
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        val_predictions = model(X_val)
        val_loss = loss_fn(val_predictions, y_val).item()
        val_rmse = torch.sqrt(val_loss)
    
    print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {avg_loss:.4f} | Val RMSE: {val_rmse:.4f}")


In [None]:
# After calculating val_loss (MSE loss on validation data):
val_rmse = torch.sqrt(val_loss)  # RMSE
