In [19]:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np
import scipy
from scipy.signal import butter
import pickle
import glob
from torchviz import make_dot
import torch

import os
import sys
sys.path.append("/workspaces/src/rPPG-Toolbox")

from evaluation.gt_visualize import GTVisualizer
from neural_methods.model.HRClassifierQuantile import HRClassifierQuantile

global_output_folder = "graphics/"
save_extension = "svg"

f_res = 0.005
sampling_rate = 30.0

In [20]:
N = 1000
x_values = np.arange(0, N)
sinusoid = np.sin(x_values / 50)
random_shuffled = np.random.permutation(sinusoid)

fig, axs = plt.subplots(2, 1, figsize=(10, 6), sharex=True)

axs[0].plot(x_values, sinusoid, label='Original Sinusoid')
axs[0].set_title('Original Sinusoid')

axs[1].plot(x_values, random_shuffled, label='Randomly Shuffled Sinusoid')
axs[1].set_title('Randomly Shuffled Sinusoid')

plt.tight_layout()
plt.savefig(os.path.join(global_output_folder, f"sinusoid_comparison.{save_extension}"))
plt.close()


In [21]:
def plot_gaussian_kernel_showcase(gaussian_1_mean, gaussian_1_std, gaussian_2_mean, gaussian_2_std, gt_value_x):

    gaussian_kernel_x = np.arange(-3 * gaussian_1_std + gaussian_1_mean, 3 * gaussian_1_std + gaussian_1_mean, f_res)
    gaussian_kernel = -(1/(gaussian_1_std * np.sqrt(2 * np.pi))) * np.exp(-0.5 * ((gaussian_kernel_x - gaussian_1_mean) / gaussian_1_std) ** 2)
    gaussian_2_kernel = -(1/(gaussian_2_std * np.sqrt(2 * np.pi))) * np.exp(-0.5 * ((gaussian_kernel_x - gaussian_2_mean) / gaussian_2_std) ** 2)

    plt.plot(gaussian_kernel_x, gaussian_kernel, label='High uncertainty')
    plt.plot(gaussian_kernel_x, gaussian_2_kernel, label='Low uncertainty')

    if gt_value_x is not None:
        plt.axvline(x=gt_value_x, color='r', linestyle='--', label='Ground Truth Frequency')
        #plt.annotate(f'Data Likelihood: {data_likelihood:.3f}', xy=(gt_value_x, 1), xytext=(gaussian_mean + gaussian_std/2, 0.95),fontsize=12, color='red')
    plt.xlabel('Network Output')
    plt.ylabel('Data Likelihood')
    plt.title('DNN output as Gaussian Distribution')
    plt.legend()

In [22]:
gaussian_1_mean = 1.1
gaussian_1_std = 0.1
gaussian_2_mean = 1.1
gaussian_2_std = 0.05
gt_value_x = 1.0

plot_gaussian_kernel_showcase(gaussian_1_mean, gaussian_1_std, gaussian_2_mean, gaussian_2_std, gt_value_x)
plt.savefig(os.path.join(global_output_folder, f"gaussian_kernel_showcase_bad_prediction.{save_extension}"))
plt.close()

plot_gaussian_kernel_showcase(1.04, gaussian_1_std, 1.04, gaussian_2_std, gt_value_x)
plt.savefig(os.path.join(global_output_folder, f"gaussian_kernel_showcase_good_prediction.{save_extension}"))
plt.close()

plot_gaussian_kernel_showcase(1.04, gaussian_1_std, 1.04, gaussian_2_std, None)
plt.savefig(os.path.join(global_output_folder, f"gaussian_kernel_showcase.{save_extension}"))
plt.close()


In [23]:
alpha = 0.1

# Simulate labels and predictions for visualization
labels = np.linspace(-2, 2, 400)
# Wide interval
preds_lower_wide = np.full_like(labels, -1.5)
preds_upper_wide = np.full_like(labels, 1.5)
# Small interval
preds_lower_small = np.full_like(labels, -0.5)
preds_upper_small = np.full_like(labels, 0.5)

def quantile_loss(labels, preds_lower, preds_upper, alpha):
    loss_lower = np.maximum(alpha * (labels - preds_lower), (alpha - 1) * (labels - preds_lower))
    loss_upper = np.maximum(alpha * (preds_upper - labels), (alpha - 1) * (preds_upper - labels))
    return loss_lower + loss_upper

loss_wide = quantile_loss(labels, preds_lower_wide, preds_upper_wide, alpha)
loss_small = quantile_loss(labels, preds_lower_small, preds_upper_small, alpha)


plt.figure(figsize=(10, 6))
plt.plot(labels, loss_wide, label='Wide Interval')
plt.plot(labels, loss_small, label='Small Interval')
plt.xlabel('Network Output')
plt.ylabel('Quantile Loss')
plt.title('Quantile Loss for Wide and Small Prediction Intervals')
plt.legend()
plt.savefig(os.path.join(global_output_folder, f"quantile_loss_intervals.{save_extension}"))
plt.close()

In [24]:
def read_from_pickle_0_flat(data, data_to_load="predictions", normalize=True):

    if isinstance(data, str):
        with open(data, "rb") as f:
            data = pickle.load(f)

    data_dict = data[data_to_load]
    data_list = []

    for key in data_dict.keys():
        for i in range(len(data_dict[key])):
            curr_data = np.array(data_dict[key][i].cpu().numpy())

            if normalize:
                curr_data = (curr_data - np.mean(curr_data)) / np.std(curr_data)

            data_list.append(curr_data)

    return np.array(data_list).reshape(len(data_dict.keys()), 1, -1)

In [25]:
results_file = "/workspaces/src/rPPG-Toolbox/runs/DeepStab/test/VitalVideos_and_UBFC_SizeW72_SizeH72_ClipLength160_DataTypeDiffNormalized_Standardized_DataAugNone_LabelTypeDiffNormalized_Crop_faceTrue_BackendY5F_Large_boxTrue_Large_size1.5_Dyamic_DetTrue_det_len30_Median_face_boxFalse_DNone_amp10.0/saved_test_outputs/VitalLens_Physnet_best_VitalVideos_and_UBFC_outputs.pickle"

sampling_rate = 30.0  # Assuming a sampling rate of 30 Hz
index_to_use = 0  # Index of the data to visualize

labels = read_from_pickle_0_flat(results_file, data_to_load="labels", normalize=True)
predictions = read_from_pickle_0_flat(results_file, data_to_load="predictions", normalize=True)
print(f"Labels shape: {labels.shape}, Predictions shape: {predictions.shape}")
labels = labels[index_to_use, 0, :]
predictions = predictions[index_to_use, 0, :]

fft_label = np.fft.fft(labels)
freq_label = np.fft.fftfreq(len(labels), d=1/sampling_rate)
fft_prediction = np.fft.fft(predictions)
freq_prediction = np.fft.fftfreq(len(predictions), d=1/sampling_rate)

freq_label = freq_label[:len(freq_label)//2]
fft_label = fft_label[:len(fft_label)//2]
freq_prediction = freq_prediction[:len(freq_prediction)//2]
fft_prediction = fft_prediction[:len(fft_prediction)//2]

# Get the first and second harmonics of the ground truth HR in Hz
first_harmonic_freq = freq_label[np.argmax(np.abs(fft_label))]
second_harmonic_freq = 2 * first_harmonic_freq
deviation = 6 / 60  # 6 beats/min converted to Hz (1 Hz = 60 beats/min)

lower_interval_start = first_harmonic_freq - deviation
lower_interval_end = first_harmonic_freq + deviation
upper_interval_start = second_harmonic_freq - deviation
upper_interval_end = second_harmonic_freq + deviation

print(f"Ground Truth Frequency: {first_harmonic_freq:.2f} Hz")
print(f"First Harmonic Frequency: {second_harmonic_freq:.2f} Hz")
print(f"Lower Interval: [{lower_interval_start:.2f}, {lower_interval_end:.2f}] Hz")
print(f"Upper Interval: [{upper_interval_start:.2f}, {upper_interval_end:.2f}] Hz")


#plt.plot(freq_label, np.abs(fft_label), label='FFT of Label')
plt.plot(freq_prediction, np.abs(fft_prediction), label='FFT of Prediction')
plt.xlim(0, 5)
plt.xlabel('Frequency (Hz)')
plt.ylabel('Magnitude')
plt.legend()
plt.savefig(os.path.join(global_output_folder, f"fft_prediction.{save_extension}"))
plt.close()

Labels shape: (32, 1, 800), Predictions shape: (32, 1, 800)
Ground Truth Frequency: 0.94 Hz
First Harmonic Frequency: 1.88 Hz
Lower Interval: [0.84, 1.04] Hz
Upper Interval: [1.77, 1.98] Hz


In [26]:
%matplotlib inline

[b, a] = butter(1, [0.6 / sampling_rate * 2, 3.3 / sampling_rate * 2], btype='bandpass')
predictions_filt = scipy.signal.filtfilt(b, a, np.double(predictions))
labels_filt = scipy.signal.filtfilt(b, a, np.double(labels))

fft_label_filt = np.fft.fft(labels_filt)
freq_label_filt = np.fft.fftfreq(len(labels_filt), d=1/sampling_rate)
fft_prediction_filt = np.fft.fft(predictions_filt)
freq_prediction_filt = np.fft.fftfreq(len(predictions_filt), d=1/sampling_rate)

freq_label_filt = freq_label_filt[:len(freq_label_filt)//2]
fft_label_filt = fft_label_filt[:len(fft_label_filt)//2]
freq_prediction_filt = freq_prediction_filt[:len(freq_prediction_filt)//2]
fft_prediction_filt = fft_prediction_filt[:len(fft_prediction_filt)//2]

#plt.plot(freq_label_filt, np.abs(fft_label_filt), label='FFT of Filtered Label')
plt.plot(freq_prediction_filt, np.abs(fft_prediction_filt), label='FFT of Filtered Prediction')
plt.xlim(0, 5)
plt.xlabel('Frequency (Hz)')
plt.ylabel('Magnitude')
plt.legend()
plt.savefig(os.path.join(global_output_folder, f"fft_prediction_filt.{save_extension}"))
plt.close()


In [27]:
autocorr = np.correlate(labels_filt, labels_filt, mode='full')
autocorr = autocorr[autocorr.size // 2:]  # Keep only non-negative lags
frequency = np.arange(0, len(autocorr)) / sampling_rate

plt.plot(frequency, autocorr)
plt.xlim(0, 5)
plt.title('Autocorrelation of PPG Signal')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Autocorrelation')
plt.savefig(os.path.join(global_output_folder, f"autocorrelation_ppg_signal.{save_extension}"))
plt.close()

time = np.arange(0, len(labels_filt)) / sampling_rate
plt.plot(time, labels_filt)
plt.xlim(0, 5)
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.title('Filtered PPG Signal')
plt.savefig(os.path.join(global_output_folder, f"filtered_ppg_signal.{save_extension}"))
plt.close()

In [28]:
%matplotlib inline

#plt.plot(freq_label_filt, np.abs(fft_label_filt), label='FFT of Filtered Label')
plt.plot(freq_prediction_filt, np.abs(fft_prediction_filt), label='FFT of Filtered Prediction')
plt.axvline(x=lower_interval_start, color='r', linestyle='--', label='Ground Truth Frequency')
plt.axvline(x=lower_interval_end, color='r', linestyle='--')
plt.axvline(x=upper_interval_start, color='g', linestyle='--', label='First Harmonic')
plt.axvline(x=upper_interval_end, color='g', linestyle='--')
plt.xlim(0, 5)
plt.xlabel('Frequency (Hz)')
plt.ylabel('Magnitude')
plt.legend()
plt.savefig(os.path.join(global_output_folder, f"fft_prediction_filt_with_snr_intervals.{save_extension}"))
plt.close()

In [29]:
def get_raw_data_vital_videos(data_path):
    """Returns data directories under the path(For Preprocessed Vital Videos dataset)."""
    data_dirs = glob.glob(data_path + os.sep + "*.mp4")
    if not data_dirs:
        raise ValueError("data paths empty!")
    dirs = list()
    for data_dir in data_dirs:
        folder_name = os.path.basename(data_dir)
        subject = folder_name.split("_")[0]
        index = int(folder_name.split("_")[1].split(".")[0])
        gt_file_name = os.path.join(data_path, f"{subject}.json")
        gt_visualizer = GTVisualizer(gt_file_name, folder_name)

        location = gt_visualizer.get_location()['location']

        # Check if the corresponding JSON file exists
        json_file = os.path.join(data_path, f"{subject}.json")
        if not os.path.exists(json_file):
            raise ValueError(f"JSON file {json_file} does not exist for subject {subject}. Skipping this directory.")

        # Append the directory information to the list
        dirs.append({"index": index, "path": data_dir, "subject": subject, "location": location})
    
    dirs = sorted(dirs, key=lambda x: (x['location'] + x['path']))

    for i in range(len(dirs)):
        dirs[i]['index'] = i

    return dirs

In [30]:
def plot_pie_chart(data_list, title, filename):
    print(data_list)
    labels = list(set(data_list))
    counts = [data_list.count(label) for label in labels]
    plt.figure(figsize=(6, 6))
    plt.pie(counts, labels=labels, autopct='%1.1f%%', startangle=140, textprops={'fontsize': 20})
    plt.title(title, fontsize=25)
    plt.savefig(os.path.join(global_output_folder, filename))
    plt.close()


In [31]:

def plot_histogram(data_list, title, filename):
    data_list = [int(age) for age in data_list if age.isdigit()]  # Filter out non-numeric ages
    plt.figure(figsize=(10, 6))
    plt.hist(data_list, bins=10, edgecolor='black')
    plt.title(title, fontsize=25)
    plt.xlabel('Age', fontsize=20)
    plt.ylabel('Frequency', fontsize=20)
    plt.xticks(fontsize=15)
    plt.yticks(fontsize=15)
    plt.savefig(os.path.join(global_output_folder, filename))
    plt.close()


In [32]:

def evaluate_dataset_stats(data_dirs, dataset_name="Test"):
    """Evaluate dataset statistics."""
    num_samples = len(data_dirs)
    print(f"Number of samples in the dataset: {num_samples}")

    genders_list = []
    ages_list = []
    fitzpatricks_list = []
    locations_list = []

    # Example: Print first 5 entries
    for curr_dir in data_dirs:
        curr_video_path = curr_dir['path']
        curr_json_path = os.path.join(os.path.dirname(curr_video_path), f"{curr_dir['subject']}.json")

        gt_visualizer = GTVisualizer(curr_json_path, curr_video_path)
        location = gt_visualizer.get_location()['location']
        fitzpatrick = gt_visualizer.get_fitzpatrick()
        age = gt_visualizer.get_age()
        gender = gt_visualizer.get_gender()

        genders_list.append(gender.strip())
        ages_list.append(age.strip())
        fitzpatricks_list.append(fitzpatrick.strip())
        locations_list.append(location.strip())


    plot_pie_chart(genders_list, 'Gender Distribution', f'{dataset_name}_gender_distribution_pie.{save_extension}')
    plot_pie_chart(fitzpatricks_list, 'Fitzpatrick Skin Type Distribution', f'{dataset_name}_fitzpatrick_distribution_pie.{save_extension}')
    plot_pie_chart(locations_list, 'Location Distribution', f'{dataset_name}_location_distribution_pie.{save_extension}')
    plot_histogram(ages_list, 'Age Distribution', f'{dataset_name}_age_distribution_pie.{save_extension}')


In [33]:

dataset_path = "/mnt/data/vitalVideos"
train_split = range(0, 141)  # Example split, adjust as needed
valid_split = range(173, 202)  # Example split, adjust as needed
test_split = range(141, 173)  # Example split, adjust as needed

location_names = ["B", "KL", "KLL", "KU", "KUL", "T", "WZC"]
location_splits = {
    "B": range(0, 14),
    "KL": range(14, 31),
    "KLL": range(31, 141),
    "KU": range(141, 155),
    "KUL": range(155, 173),
    "T": range(173, 188),
    "WZC": range(188, 202)
}

data_dirs = get_raw_data_vital_videos(dataset_path)
data_dirs_train = [d for d in data_dirs if d['index'] in train_split]
data_dirs_valid = [d for d in data_dirs if d['index'] in valid_split]
data_dirs_test = [d for d in data_dirs if d['index'] in test_split]

evaluate_dataset_stats(data_dirs_test, dataset_name="VitalVideos_Test")
evaluate_dataset_stats(data_dirs_train, dataset_name="VitalVideos_Train")
evaluate_dataset_stats(data_dirs_valid, dataset_name="VitalVideos_Valid")

plt.pie([len(locations_split) for locations_split in location_splits.values()],
        labels=location_names,
        #autopct='%1.1f%%',
        startangle=140,
        textprops={'fontsize': 20})
plt.savefig(os.path.join(global_output_folder, f"location_distribution_pie.{save_extension}"))
plt.close()

Number of samples in the dataset: 32
['F', 'F', 'F', 'F', 'M', 'M', 'F', 'F', 'M', 'M', 'F', 'F', 'F', 'F', 'F', 'F', 'M', 'M', 'M', 'M', 'F', 'F', 'M', 'M', 'M', 'M', 'M', 'M', 'M', 'M', 'M', 'M']
['4', '4', '2', '2', '2', '2', '3', '3', '5', '5', '5', '5', '2', '2', '4', '4', '2', '2', '4', '4', '3', '3', '2', '2', '3', '3', '1', '1', '3', '3', '2', '2']
['KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KU', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL', 'KUL']
Number of samples in the dataset: 141
['M', 'M', 'F', 'F', 'M', 'M', 'M', 'M', 'M', 'M', 'M', 'M', 'F', 'F', 'F', 'F', 'F', 'F', 'M', 'M', 'M', 'M', 'F', 'F', 'F', 'M', 'M', 'F', 'F', 'M', 'M', 'F', 'F', 'F', 'F', 'M', 'M', 'F', 'F', 'F', 'F', 'M', 'M', 'M', 'M', 'M', 'M', 'M', 'M', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'M', 'M', 'F', 'F', 'F', 'F', 'M', 'M', 'M', 'M', 'F', 'F', 'M', 'M', 'F', 'F', 'F', 'F', 'M', 'M', 'M', '