In [None]:
import torch  # PyTorch main package
import torch.nn as nn  # Neural network module from PyTorch
import torch.nn.functional as F  # Functional API for neural networks (like activation functions)
import pytorch_lightning as pl  # High-level framework built on top of PyTorch to automate the training loop
from torch.utils.data import DataLoader  # DataLoader helps in batching the data for efficient training
from Data_loader_3DCXNet import UBFC_LOADER  # Custom dataset class to load UBFC data
import scipy  # Library for scientific and technical computing
from scipy import signal  # Signal processing module from scipy
from pytorch_lightning.callbacks import ModelCheckpoint  # Callback for saving model checkpoints during training
from pytorch_lightning.loggers import TensorBoardLogger  # Logger to send data to TensorBoard for visualization
import numpy as np  # Library for numerical computing, handling arrays and matrices
import matplotlib.pyplot as plt  # Plotting library for visualizing graphs and data


In [None]:


def HR_FINDER(wave, fs):
    # Perform FFT on the wave signal and extract the first 500 frequency bins (positive frequencies)
    fftData = np.fft.fft(wave, 1000)[0:500]
    # Generate frequency values corresponding to the FFT bins
    hz = np.linspace(0, fs / 2, int(len(fftData)))
    # Calculate the power spectrum by squaring the magnitude of the FFT coefficients
    powerSpectrum = np.abs(fftData)**2
    # Find the index of the maximum power (dominant frequency)
    maxFreq = np.argmax(powerSpectrum)
    # Convert the frequency (Hz) to heart rate (bpm)
    HR = hz[maxFreq] * 60
    # Return the calculated heart rate
    return HR

def filtering(data, fs, lowerCutoff=1, higherCutoff=3, filterOrder=5):
    # Normalize the cutoff frequencies to the Nyquist frequency (half of the sampling rate)
    lowerCutoffDigital = lowerCutoff / (0.5 * fs)
    higherCutoffDigital = higherCutoff / (0.5 * fs)
    # Design a Butterworth bandpass filter with the specified order and cutoff frequencies
    b, a = signal.butter(filterOrder, [lowerCutoffDigital, higherCutoffDigital], btype='band', analog=False)
    # Apply the filter to the input data using zero-phase filtering (filtfilt)
    filtsignal = signal.filtfilt(b, a, data)
    # Return the filtered signal
    return filtsignal


In [None]:
# Define the path to the training dataset
train_path = r'D:\Physiosens_data\face\train'

# Define the path to the validation dataset
val_path = r'D:\Physiosens_data\face\val'


In [None]:
# Load the training dataset using the UBFC_LOADER function
train_data = UBFC_LOADER(train_path)

# Load the validation dataset using the UBFC_LOADER function
val_data = UBFC_LOADER(val_path)


In [None]:
# Create a DataLoader for the training data with specified batch size, dropping the last batch if it's incomplete, and using 8 workers for parallel data loading
train_loader = DataLoader(train_data, batch_size=10, drop_last=True, num_workers=8)

# Create a DataLoader for the validation data with a batch size of 10 and using 8 workers for parallel data loading
val_loader = DataLoader(val_data, batch_size=10, num_workers=8)


# Model

In [None]:
class Transformer_ppg(nn.Module):
    def __init__(self):
        super().__init__()
        
        # First 3D Convolution Layer: Extract features from the input signal (3D convolution)
        self.cnn1 = nn.Sequential(
            nn.Conv3d(in_channels=3, out_channels=16, kernel_size=(3, 3, 3), padding=(1, 1, 1)),  # 3D Convolution
            nn.BatchNorm3d(16),  # Batch Normalization
            nn.ReLU(),  # ReLU activation
            nn.MaxPool3d(kernel_size=(1, 2, 2)),  # Max Pooling
        )

        # Second 3D Convolution Layer
        self.cnn2 = nn.Sequential(
            nn.Conv3d(in_channels=16, out_channels=32, kernel_size=(3, 3, 3), padding=(1, 1, 1)),  # 3D Convolution
            nn.BatchNorm3d(32),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2)),  # Max Pooling
        )

        # Third 3D Convolution Layer
        self.cnn3 = nn.Sequential(
            nn.Conv3d(in_channels=32, out_channels=64, kernel_size=(3, 3, 3), padding=(1, 1, 1)),  # 3D Convolution
            nn.BatchNorm3d(64),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2)),  # Max Pooling
        )
        
        # Global Average Pooling to reduce the output to a single vector
        self.global_pool = nn.AdaptiveAvgPool1d(output_size=(1))
        
        # Max Pooling for 1D signal
        self.maxpool = nn.MaxPool1d(kernel_size=2, stride=2, padding=0)

        # First 1D Convolution Block
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=64, kernel_size=3, stride=1, padding=1),  # 1D Convolution
            nn.BatchNorm1d(64),  # Batch Normalization
            nn.ReLU(),
            nn.Conv1d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),  # Another 1D Convolution
            nn.BatchNorm1d(64),
            nn.ReLU(),
        )

        # First Transformer Encoder Block (to model sequential data)
        encoder_layer1 = nn.TransformerEncoderLayer(d_model=150, nhead=1, batch_first=True)
        self.transformer_model1 = nn.TransformerEncoder(encoder_layer1, num_layers=2)

        # Second 1D Convolution Block
        self.conv2 = nn.Sequential(
            nn.Conv1d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),  # 1D Convolution
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Conv1d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),  # Another 1D Convolution
            nn.BatchNorm1d(64),
            nn.ReLU(),
        )

        # Second Transformer Encoder Block
        encoder_layer2 = nn.TransformerEncoderLayer(d_model=150, nhead=1, batch_first=True)
        self.transformer_model2 = nn.TransformerEncoder(encoder_layer2, num_layers=2)

        # Third 1D Convolution Block
        self.conv3 = nn.Sequential(
            nn.Conv1d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),  # 1D Convolution
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Conv1d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),  # Another 1D Convolution
            nn.BatchNorm1d(64),
            nn.ReLU(),
        )

        # Third Transformer Encoder Block
        encoder_layer3 = nn.TransformerEncoderLayer(d_model=150, nhead=1, batch_first=True)
        self.transformer_model3 = nn.TransformerEncoder(encoder_layer3, num_layers=2)

        # Final 1D Convolution Layer to output the final signal
        self.end = nn.Conv1d(in_channels=64, out_channels=1, kernel_size=3, stride=1, padding=1)
        
        # Dropout for regularization
        self.drop_out = nn.Dropout(p=0.5)
        self.relu = nn.ReLU()

    def forward(self, face):
        # Process the face input through the CNN layers
        forehead = self.cnn1(face)
        forehead = self.cnn2(forehead)
        forehead = self.cnn3(forehead)
        
        # Permute the dimensions for global pooling
        forehead = forehead.permute(0, 2, 1, 3, 4)
        b, c, _, _, _ = forehead.shape
        forehead = forehead.reshape(b, c, -1)  # Flatten the spatial dimensions
        forehead = self.global_pool(forehead).squeeze(-1)  # Global Average Pooling
        forehead = forehead.unsqueeze(1)  # Add extra dimension for conv layers
        
        # Pass the forehead features through the first convolutional and transformer layers
        signal = self.conv1(forehead)
        signal = self.transformer_model1(signal)
        signal = self.drop_out(signal)
        signal = self.maxpool(signal)
        signal = F.interpolate(signal, scale_factor=2)

        # Pass the features through the second convolutional and transformer layers
        signal = self.conv2(signal)
        signal = self.drop_out(signal)
        signal = self.transformer_model2(signal)
        signal = self.maxpool(signal)
        signal = F.interpolate(signal, scale_factor=2)

        # Pass the features through the third convolutional and transformer layers
        signal = self.conv3(signal)
        signal = self.transformer_model3(signal)
        signal = self.maxpool(signal)
        signal = F.interpolate(signal, scale_factor=2)
        signal = self.drop_out(signal)

        # Final convolution layer to obtain the output signal
        signal = self.end(signal)
        
        # Return the final output signal
        return torch.squeeze(signal)


In [None]:
# Pearson correlation loss between two signals
def Pearson_loss(signal1, signal2):
    # Calculate the mean of both signals
    mean_signal1 = torch.mean(signal1)
    mean_signal2 = torch.mean(signal2)
    
    # Numerator: Sum of the product of deviations of both signals from their means
    num = torch.sum((signal1 - mean_signal1) * (signal2 - mean_signal2))
    
    # Denominators: Sum of the squared deviations from the mean for each signal
    dem1 = torch.sqrt(torch.sum((signal1 - mean_signal1) ** 2))
    dem2 = torch.sqrt(torch.sum((signal2 - mean_signal2) ** 2))
    
    # Pearson correlation coefficient (normalized)
    val = num / (dem1 * dem2)
    
    return val

# Function to compute the complete Pearson correlation loss between two sets of signals
def Complete_Pearson(SIGNAL1, SIGNAL2):
    loss = 0
    
    # Ensure both input signal sets have the same number of signals
    assert SIGNAL1.shape[0] == SIGNAL2.shape[0], 'error: signals count mismatch'
    
    # Iterate over each signal pair and accumulate the Pearson loss
    for i in range(SIGNAL1.shape[0]):
        signal1 = SIGNAL1[i]
        signal2 = SIGNAL2[i]
        
        # Add the Pearson loss for this signal pair to the total loss
        loss += Pearson_loss(signal1, signal2)
    
    return loss


In [None]:
# PyTorch Lightning model class for the transformer-based PPG model
class Transformer_model(pl.LightningModule):
    def __init__(self):
        super().__init__()
        
        # Initialize the Transformer_ppg model
        self.model = Transformer_ppg()
        
        # Loss function (Mean Squared Error Loss)
        self.mse = nn.MSELoss()
        
        # Optimizer (Adam optimizer)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
        
    def forward(self, face):
        # Forward pass through the model
        ypred = self.model(face)
        return ypred
    
    def training_step(self, batch, batch_index):
        # Get the inputs and ground truth for training
        face, ppg = batch
        
        # Get the model predictions
        ypred = self(face)
        
        # Calculate the loss: MSE loss and subtract Pearson loss
        loss = self.mse(ypred, ppg) - Complete_Pearson(ypred, ppg)
        
        # Log the training loss
        self.log('Training_loss', loss)
        
        return loss
    
    def validation_step(self, batch, batch_index):
        # Get the inputs and ground truth for validation
        face, ppg = batch
        
        # Get the model predictions
        ypred = self(face)
        
        # Calculate the loss: MSE loss and subtract Pearson loss
        loss = self.mse(ypred, ppg) - Complete_Pearson(ypred, ppg)
        
        ytest = []
        ground = []
        
        # Loop through the predictions and ground truth to calculate HR (Heart Rate)
        for pred, LABEL in zip(ypred, ppg):
            pred = torch.squeeze(pred)  # Remove extra dimensions
            pred = pred.cpu().detach().numpy()  # Convert to NumPy for further processing
            
            # Apply signal filtering to the prediction
            pred = filtering(data=pred, fs=30, lowerCutoff=.7, higherCutoff=3, filterOrder=5)
            
            # Find the heart rate from the filtered prediction
            hr2 = HR_FINDER(pred, 30)
            
            # Detrend the ground truth signal and calculate its HR
            LABEL = scipy.signal.detrend(LABEL.cpu().detach().numpy())
            labelhr = HR_FINDER(LABEL, 30)
            
            # Append both predicted and ground truth HRs
            ground.append(labelhr)
            ytest.append(hr2)
        
        # Calculate Mean Absolute Error (MAE) between the predicted and ground truth HRs
        MAE = 0
        for i in range(len(ground)):
            MAE += abs(ytest[i] - ground[i])
        MAE = MAE / i
        
        # Log the MAE for validation
        self.log('Val loss', MAE)
        
        return loss
    
    def configure_optimizers(self):
        # Return the optimizer for training
        return [self.optimizer]


In [None]:
# Initialize a ModelCheckpoint callback to save the best model based on validation loss
checkpoint = ModelCheckpoint(
    monitor='Val loss',  # Monitor the validation loss metric for model checkpointing
    save_top_k=1,        # Save only the best model (with the lowest validation loss)
    mode='min'           # Save the model when the validation loss is minimized
)


In [None]:
model=Transformer_model()# creating a instance of the model



In [None]:
# Initialize the PyTorch Lightning trainer with the following settings:
trainer = pl.Trainer(
    accelerator='auto',  # Automatically choose the best accelerator (GPU/CPU) based on availability
    logger=TensorBoardLogger(save_dir='.\logs_test'),  # Use TensorBoard for logging, save logs in the './logs_test' directory
    log_every_n_steps=10,  # Log the training progress every 10 steps
    callbacks=checkpoint,  # Add the ModelCheckpoint callback to save the best model based on validation loss
    max_epochs=100  # Set the maximum number of training epochs to 100
)


  trainer=pl.Trainer(accelerator='auto',logger=TensorBoardLogger(save_dir='.\logs_test'),log_every_n_steps=10,
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [None]:
# Start the training process with the following settings:
trainer.fit(
    model,            # The model to train (e.g., your Transformer model)
    train_loader,     # The DataLoader for the training data
    val_loader        # The DataLoader for the validation data
)


You are using a CUDA device ('NVIDIA GeForce RTX 4060 Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type            | Params | Mode 
--------------------------------------------------
0 | model | Transformer_ppg | 4.4 M  | train
1 | mse   | MSELoss         | 0      | train
--------------------------------------------------
4.4 M     Trainable params
0         Non-trainable params
4.4 M     Total params
17.522    Total estimated model params size (MB)
109       Modules in train mode
0         Modules in eval mode


Sanity Checking: |                                                                               | 0/? [00:00<…

C:\Users\Aravind\AppData\Roaming\Python\Python312\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:419: Consider setting `persistent_workers=True` in 'val_dataloader' to speed up the dataloader worker initialization.


# Loading the weight of best model

In [None]:
# Load the trained model from a checkpoint
model = Transformer_model.load_from_checkpoint(
    r'D:\Lab_work\transformer_ppg\pycharm\logs\lightning_logs\version_3\checkpoints\epoch=29-step=1080.ckpt',  # Path to the checkpoint file
    map_location=torch.device('cuda')  # Load the model onto the GPU
)

# Set the model to evaluation mode
model = model.eval()  # This switches the model to evaluation mode (disables dropout, batch norm updates, etc.)


C:\Users\Aravind\AppData\Roaming\Python\Python312\site-packages\pytorch_lightning\utilities\migration\utils.py:56: The loaded checkpoint was produced with Lightning v2.5.1.post0, which is newer than your current Lightning version: v2.4.0


In [None]:
import time

# Initialize lists to store results
ytest = []         # List to store predicted HR values (hr2)
ground = []        # List to store ground truth HR values
time_req = []      # List to store the time taken for each prediction
count = 0          # Counter for the number of samples processed

# Iterate over the validation data
for face, ppg in val_data:
    count += 1                          # Increment the count for each sample
    face = face.unsqueeze(0).to('cuda')  # Add batch dimension and move to GPU

    # Perform prediction without tracking gradients (evaluation mode)
    with torch.no_grad():
        tic = time.time()  # Start the timer for measuring prediction time
        pred = model(face)  # Make prediction
        toc = time.time()   # End the timer after prediction
        time_req.append(toc - tic)  # Store the time taken for this prediction
    
    # Process the predicted signal
    pred = torch.squeeze(pred)  # Remove unnecessary dimensions
    pred = pred.cpu().detach().numpy()  # Move the result back to CPU and convert to numpy
    pred = filtering(data=pred, fs=30, lowerCutoff=.7, higherCutoff=3, filterOrder=5)  # Apply filtering

    # Process the filtered signal using HeartPy to get HR
    working_data, measures = hp.process(pred, 30)
    hr1 = measures['bpm']  # Extract the HR value from HeartPy's measures
    
    # Use the custom HR_FINDER function to calculate HR from the filtered signal
    hr2 = HR_FINDER(pred, 30)
    HR = [hr1, hr2]  # Store both HR values

    # Detrend the ground truth PPG signal and calculate HR
    label = scipy.signal.detrend(ppg.detach().numpy())  # Remove trend from the PPG signal
    labelhr = HR_FINDER(label, 30)  # Calculate HR from the ground truth signal

    # Append ground truth and predicted HR values to the respective lists
    ground.append(HR_FINDER(label, 30))
    ytest.append(hr2)


In [None]:
predict_hr_fft_all = np.array(ytest)
gt_hr_fft_all = np.array(ground)
num_test_samples = len(predict_hr_fft_all)
METRIC=[]
for metric in ['MAE','RMSE','MAPE','Pearson', 'SNR']:
    if metric == "MAE":
        MAE_FFT = np.mean(np.abs(predict_hr_fft_all - gt_hr_fft_all))
        standard_error = np.std(np.abs(predict_hr_fft_all - gt_hr_fft_all)) / np.sqrt(num_test_samples)
        METRIC.append(MAE_FFT)
        print("FFT MAE (FFT Label): {0} +/- {1}".format(MAE_FFT, standard_error))
    elif metric == "RMSE":
        RMSE_FFT = np.sqrt(np.mean(np.square(predict_hr_fft_all - gt_hr_fft_all)))
        standard_error = np.std(np.square(predict_hr_fft_all - gt_hr_fft_all)) / np.sqrt(num_test_samples)
        print("FFT RMSE (FFT Label): {0} +/- {1}".format(RMSE_FFT, standard_error))
        METRIC.append(RMSE_FFT)
    elif metric == "MAPE":
        MAPE_FFT = np.mean(np.abs((predict_hr_fft_all - gt_hr_fft_all) / gt_hr_fft_all)) * 100
        standard_error = np.std(np.abs((predict_hr_fft_all - gt_hr_fft_all) / gt_hr_fft_all)) / np.sqrt(num_test_samples) * 100
        print("FFT MAPE (FFT Label): {0} +/- {1}".format(MAPE_FFT, standard_error))
        METRIC.append(MAPE_FFT)
    elif metric == "Pearson":
        Pearson_FFT = np.corrcoef(predict_hr_fft_all, gt_hr_fft_all)
        correlation_coefficient = Pearson_FFT[0][1]
        standard_error = np.sqrt((1 - correlation_coefficient**2) / (num_test_samples - 2))
        METRIC.append(Pearson_FFT)
        print("FFT Pearson (FFT Label): {0} +/- {1}".format(correlation_coefficient, standard_error))

FFT MAE (FFT Label): 0.9485451859825879 +/- 0.15459194275004323
FFT RMSE (FFT Label): 2.1006766238633543 +/- 2.0123552006328307
FFT MAPE (FFT Label): 1.0302089595994937 +/- 0.19692249313961707
FFT Pearson (FFT Label): 0.9929127488430216 +/- 0.009869588673605907
