In [1]:
%reset -f

In [2]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import time
import statistics
from tqdm import tqdm
from matplotlib import patches

from MINE.Log import Log
from MINE.Analysis import SessionAnalytics, ExperimentAnalytics
from MINE.StreamFilter import IStreamFilter, TimestampStreamFilter
from MINE.SessionFilters import ISessionFilter, ContainsStreamSessionFilter, ContainsMarkersSessionFilter
from MINE.StreamProcessing import StreamProcesses
from MINE.StreamOutput import StreamOutput
from numpy.typing import NDArray

In [3]:
class ProjectData:
    def __init__(self):
        self.video_dictionary: dict[str, VideoData] = {}
        self.participant_dictionary: dict[str, ParticipantData] = {}

class ParticipantData:
    def __init__(self):
        self.participant_id: str | None = None
        self.colour: str | None = None
        self.calibration_baseline_heartrate: float | None = None

class VideoData:
    def __init__(self):
        #Video Meta-Data
        self.video_id: str | None = None
        self.video_start_time: float | None = None
        self.video_end_time: float | None = None
        self.localised_video_start_time: float | None = None
        self.localised_video_end_time: float | None = None
        self.video_duration: float | None = None
        self.full_duration: float | None = None

        #Session Entries
        self.session_entries: dict[str, SessionVideoEntry] = {}

        #Analytics
        self.mean_baseline_heartrate: float | None = None
        self.mean_video_heartrate: float | None = None
        self.mean_baseline_deviation: float | None = None

class SessionVideoEntry:
    def __init__(self):
        #Data
        self.video_analytics: SessionAnalytics | None = None

        #Heart Rate Data
        self.baseline_heartrate: float | None = None
        self.video_mean_heartrate: float | None = None
        self.mean_baseline_deviation: float | None = None

        #Dataframes
        self.heartrate: pd.DataFrame | None = None
        self.heartrate_baseline_deviation: pd.DataFrame | None = None

In [4]:
def stream_filter() -> ContainsStreamSessionFilter:
    return ContainsStreamSessionFilter([
        "Marker",
        "PPG_GRN",
        "PPG_RED",
        "PPG_IR",
    ])

def marker_filter() -> ContainsMarkersSessionFilter:
    return ContainsMarkersSessionFilter("Marker", [
        "Video: Be a floater, Event: VideoStart",
        "Video: Be a floater, Event: VideoEnd",
        "Video: RNLI Respect the water “Ladbible short film”, Event: VideoStart",
        "Video: RNLI Respect the water “Ladbible short film”, Event: VideoEnd",
        "Video: “Evans story”, Event: VideoStart",
        "Video: “Evans story”, Event: VideoEnd",
        "Video: “Little girl being rescued by RNLI”, Event: VideoStart",
        "Video: “Little girl being rescued by RNLI”, Event: VideoEnd",
        "Video: Alfie’s phone, Event: VideoStart",
        "Video: Alfie’s phone, Event: VideoEnd",
        "Video: “Float to Live”, Event: VideoStart",
        "Video: “Float to Live”, Event: VideoEnd",
        "Video: Respect the Water via the NWSF\u202f‘make the right call’, Event: VideoStart",
        "Video: Respect the Water via the NWSF\u202f‘make the right call’, Event: VideoEnd",
        "Video: “Seaside safety song”, Event: VideoStart",
        "Video: “Seaside safety song”, Event: VideoEnd",
        "Video: RNLI the breath test, Event: VideoStart",
        "Video: RNLI the breath test, Event: VideoEnd",
        "Video: RNLI Christmas bed-time story, Event: VideoStart",
        "Video: RNLI Christmas bed-time story, Event: VideoEnd",
        "Video: “RNLI: The heart-breaking story of Liam Hall”, Event: VideoStart",
        "Video: “RNLI: The heart-breaking story of Liam Hall”, Event: VideoEnd",
    ])

def participant_files() -> pd.DataFrame:
    return pd.DataFrame(columns=["Participant_ID", "File_Path"], data=[
        ["P01",    "/mnt/Data/Raw/RNLI Data/participant 1.xdf"],
        ["P02",    "/mnt/Data/Raw/RNLI Data/participant 2.xdf"],
        ["P03",    "/mnt/Data/Raw/RNLI Data/participant 3.xdf"],
        ["P04.0",  "/mnt/Data/Raw/RNLI Data/participant 4.0.xdf"],
        ["P04.1",  "/mnt/Data/Raw/RNLI Data/participant 4.1.xdf"],
        ["P05",             "/mnt/Data/Raw/RNLI Data/participant 5.xdf"],
        ["P06",             "/mnt/Data/Raw/RNLI Data/participant 6.xdf"],
        ["P07",             "/mnt/Data/Raw/RNLI Data/participant 7.xdf"],
        ["P08",             "/mnt/Data/Raw/RNLI Data/participant 8.xdf"],
        ["P09",             "/mnt/Data/Raw/RNLI Data/participant 9.xdf"],
        ["P10",    "/mnt/Data/Raw/RNLI Data/participant 10.xdf"],
        ["P11",    "/mnt/Data/Raw/RNLI Data/participant 11.xdf"],
        ["P12",             "/mnt/Data/Raw/RNLI Data/participant 12.xdf"],
        ["P13",             "/mnt/Data/Raw/RNLI Data/participant 13.xdf"],
        ["P14",             "/mnt/Data/Raw/RNLI Data/participant 14.xdf"],
        ["P15",    "/mnt/Data/Raw/RNLI Data/participant 15.xdf"],
        ["P16",    "/mnt/Data/Raw/RNLI Data/participant 16.xdf"]
    ])

In [5]:
_project_data: ProjectData = ProjectData()

_experiment_analytics: ExperimentAnalytics = ExperimentAnalytics.create_from_paths(participant_files())
_experiment_analytics: ExperimentAnalytics = _experiment_analytics.get_filtered_subset([
    marker_filter(),
    stream_filter()
])

[94m[ Message ][0m Processing: P01
[94m[ Message ][0m Processing: P02
[94m[ Message ][0m Processing: P03


Stream 21: Calculated effective sampling rate 0.0000 Hz is different from specified rate 50.0000 Hz.
Stream 20: Calculated effective sampling rate 59.8759 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P04.0


Stream 23: Calculated effective sampling rate 0.0000 Hz is different from specified rate 50.0000 Hz.
Stream 12: Calculated effective sampling rate 59.8216 Hz is different from specified rate 50.0000 Hz.
Stream 10: Calculated effective sampling rate -0.0001 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P04.1


Stream 2: Calculated effective sampling rate 59.8672 Hz is different from specified rate 50.0000 Hz.
Stream 15: Calculated effective sampling rate 0.0000 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P05


Stream 15: Calculated effective sampling rate 59.8903 Hz is different from specified rate 50.0000 Hz.
Stream 25: Calculated effective sampling rate 0.0000 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P06


Stream 19: Calculated effective sampling rate 59.8990 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P07


Stream 5: Calculated effective sampling rate 59.8858 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P08


Stream 7: Calculated effective sampling rate 59.8891 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P09


Stream 3: Calculated effective sampling rate 59.8815 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P10


Stream 20: Calculated effective sampling rate 59.8872 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P11


Stream 5: Calculated effective sampling rate 59.8941 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P12


Stream 12: Calculated effective sampling rate 59.9389 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P13


Stream 5: Calculated effective sampling rate 59.9016 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P14


Stream 3: Calculated effective sampling rate 59.9293 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Processing: P15


Stream 10: Calculated effective sampling rate 20.6733 Hz is different from specified rate 25.0000 Hz.
Stream 6: Calculated effective sampling rate 21.4140 Hz is different from specified rate 25.0000 Hz.
Stream 12: Calculated effective sampling rate 20.6327 Hz is different from specified rate 25.0000 Hz.
Stream 11: Calculated effective sampling rate 20.6268 Hz is different from specified rate 25.0000 Hz.
Stream 19: Calculated effective sampling rate 59.9221 Hz is different from specified rate 50.0000 Hz.
Stream 9: Calculated effective sampling rate 20.6154 Hz is different from specified rate 25.0000 Hz.
Stream 15: Calculated effective sampling rate 20.9101 Hz is different from specified rate 25.0000 Hz.
Stream 18: Calculated effective sampling rate 22.2426 Hz is different from specified rate 25.0000 Hz.


[94m[ Message ][0m Processing: P16


Stream 8: Calculated effective sampling rate 59.9187 Hz is different from specified rate 50.0000 Hz.


[94m[ Message ][0m Skipping session 'P02' as it does not contain marker 'Video: Be a floater, Event: VideoStart'.
[94m[ Message ][0m Skipping session 'P04.0' as it does not contain marker 'Video: Be a floater, Event: VideoStart'.
[94m[ Message ][0m Skipping session 'P04.1' as it does not contain marker 'Video: Be a floater, Event: VideoStart'.
[94m[ Message ][0m Filtering sessions containing streams: ['Marker', 'PPG_GRN', 'PPG_RED', 'PPG_IR']
[94m[ Message ][0m Skipping session 'P01' as it does not contain stream 'PPG_GRN'.


In [6]:
def heartrate_calibration(session_analytics: SessionAnalytics, participant_data: ParticipantData, sample_duration: float = 20, sampling_step_count: float = 5):
    marker_pairs = session_analytics.get_paired_markers("Marker", "Start", "End")
    heartbeat_calibration_row = marker_pairs[marker_pairs["Marker"].str.contains("Heartbeat")]

    start_of_heartbeat_calibration = heartbeat_calibration_row.iloc[0]["Start Timestamp"]
    end_of_heartbeat_calibration = heartbeat_calibration_row.iloc[0]["End Timestamp"]
    calibration_duration = end_of_heartbeat_calibration - start_of_heartbeat_calibration

    calibration_analytics: SessionAnalytics = session_analytics.get_filtered_subset([
        TimestampStreamFilter(start_of_heartbeat_calibration, end_of_heartbeat_calibration)
    ])

    calibration_analytics.localise_timestamps(start_of_heartbeat_calibration)
    peak_annotations_stream = calibration_analytics.stream_data_dictionary["PPG_GRN_Filtered_Peak_Annotations"]

    #region [ Calculate Heartrate Averages ]
    min_bpm: float = float("inf")
    max_bpm: float = float("-inf")

    values: NDArray[float] = np.empty(0)
    timestamps: NDArray[float] = np.empty(0)

    sample_time = sample_duration / 2
    while sample_time < (calibration_duration - (sample_duration / 2)):
        peaks_subset = peak_annotations_stream[
            (peak_annotations_stream["Timestamp"] > (sample_time - (sample_duration / 2))) &
            (peak_annotations_stream["Timestamp"] < (sample_time + (sample_duration / 2)))
        ]

        bpm = len(peaks_subset) * (60 / sample_duration)

        if bpm < min_bpm: min_bpm = bpm
        if bpm > max_bpm: max_bpm = bpm

        values = np.append(values, bpm)
        timestamps = np.append(timestamps, sample_time)

        sample_time = sample_time + sampling_step_count

    bpm_dataframe = pd.DataFrame({
        "Timestamp": timestamps,
        "Value": values
    })
    #endregion

    #region [ Calculate Calibrated Baseline ]
    baseline_heartrate = bpm_dataframe["Value"].mean()
    #endregion

    participant_data.calibration_baseline_heartrate = baseline_heartrate

def participant_analysis():
    colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
    _project_data.participant_dictionary = {}
    for index, row in tqdm(_experiment_analytics.analytics_dataframe.iterrows(), total=_experiment_analytics.analytics_dataframe.shape[0], desc="Preparing Sessions for Analysis"):
        session_analytics: SessionAnalytics = row["Analysis_Object"]
        session_id: str = session_analytics.file_name

        participant_data = _project_data.participant_dictionary[session_id] = ParticipantData()
        participant_data.colour = colors[index % len(colors)]

        #region [ Process Streams ]
        session_analytics.localise_timestamps()

        StreamProcesses.butterworth_filter(session_analytics, "PPG_GRN", "PPG_GRN_Filtered")
        StreamProcesses.detect_ppg_peaks(session_analytics, "PPG_GRN_Filtered", "PPG_GRN_Filtered_Peak_Annotations")
        #StreamProcesses.generate_peak_interval_duration_stream(session_analytics, "PPG_GRN_Filtered_Peak_Annotations", "PPG_GRN_Peak_Intervals")
        #StreamProcesses.generate_interval_differences_stream(session_analytics, "PPG_GRN_Peak_Intervals", "PPG_GRN_Peak_Interval_Differences")

        #StreamProcesses.generate_rmssd_stream(session_analytics, "PPG_GRN_Peak_Interval_Differences", "PPG_GRN_RMSSD")
        #StreamProcesses.generate_sdnn_stream(session_analytics, "PPG_GRN_Peak_Interval_Differences", "PPG_GRN_SDNN")

        #StreamProcesses.create_vectors_from_component_streams(session_analytics, "ACC_X", "ACC_Y", "ACC_Z", "ACC_Vector")
        #StreamProcesses.calculate_magnitudes_from_vector_stream(session_analytics, "ACC_Vector", "ACC_Magnitude")

        #StreamProcesses.create_vectors_from_component_streams(session_analytics, "GYRO_X", "GYRO_Y", "GYRO_Z", "GYRO_Vector")
        #StreamProcesses.calculate_magnitudes_from_vector_stream(session_analytics, "GYRO_Vector", "GYRO_Magnitude")
        #endregion

        #region [ Calibration ]
        heartrate_calibration(session_analytics, participant_data)
        break
        #endregion

participant_analysis()

Preparing Sessions for Analysis:   0%|          | 0/13 [00:06<?, ?it/s]


In [11]:
for index, row in tqdm(_experiment_analytics.analytics_dataframe.iterrows(), total=_experiment_analytics.analytics_dataframe.shape[0], desc="Preparing Sessions for Analysis"):
    session_analytics: SessionAnalytics = row["Analysis_Object"]

    sdnn = StreamProcesses.get_sdnn_sample_from_annotated_ppg(session_analytics, "PPG_GRN_Filtered_Peak_Annotations", 10, 40)
    rmssd = StreamProcesses.get_rmssd_sample_from_annotated_ppg(session_analytics, "PPG_GRN_Filtered_Peak_Annotations", 10, 40)

    print(f"SDNN: {sdnn}")
    print(f"RMSDD: {rmssd}")
    break

Preparing Sessions for Analysis:   0%|          | 0/13 [00:00<?, ?it/s]

SDNN: 115.83987118522622
RMSDD: 196.7238342526725





In [None]:
def retrieve_heartrate_data(video_data: VideoData, session_video_entry: SessionVideoEntry, sample_duration: float = 20, sampling_step_count: float = 5):
    video_analytics: SessionAnalytics = session_video_entry.video_analytics
    peak_annotations_stream = video_analytics.stream_data_dictionary["PPG_GRN_Filtered_Peak_Annotations"]

    #region [ Calculate Heartrate Averages ]
    min_bpm: float = float("inf")
    max_bpm: float = float("-inf")

    values: NDArray[float] = np.empty(0)
    timestamps: NDArray[float] = np.empty(0)

    sample_time = (video_data.localised_video_start_time -60 + (sample_duration / 2))
    while sample_time < (video_data.localised_video_end_time - (sample_duration / 2)):
        peaks_subset = peak_annotations_stream[
            (peak_annotations_stream["Timestamp"] > (sample_time - (sample_duration / 2))) &
            (peak_annotations_stream["Timestamp"] < (sample_time + (sample_duration / 2)))
        ]

        bpm = len(peaks_subset) * (60 / sample_duration)

        if bpm < min_bpm: min_bpm = bpm
        if bpm > max_bpm: max_bpm = bpm

        values = np.append(values, bpm)
        timestamps = np.append(timestamps, sample_time)

        sample_time = sample_time + sampling_step_count

    bpm_dataframe = pd.DataFrame({
        "Timestamp": timestamps,
        "Value": values
    })
    #endregion

    #region [ Calculate Baseline ]
    baseline_sample_subset = bpm_dataframe[
        (bpm_dataframe["Timestamp"] > -30) &
        (bpm_dataframe["Timestamp"] < 0)
    ]

    baseline_heartrate = baseline_sample_subset["Value"].mean()
    #endregion

    #region [ Calculate Average Video Heartrate ]
    video_mean_sample_subset = bpm_dataframe[
        (bpm_dataframe["Timestamp"] > 0) &
        (bpm_dataframe["Timestamp"] < video_data.video_duration)
    ]

    video_mean_heartrate = video_mean_sample_subset["Value"].mean()
    #endregion

    #region [ Calculate Heartrate Baseline Deviation ]
    baseline_deviation_dataframe = pd.DataFrame({
        "Timestamp": bpm_dataframe["Timestamp"],
        "Value": pd.Series(bpm_dataframe["Value"]).apply(lambda value: value / baseline_heartrate)
    })

    mean_baseline_deviation:float = video_mean_heartrate / baseline_heartrate
    #endregion

    session_video_entry.baseline_heartrate = baseline_heartrate
    session_video_entry.video_mean_heartrate = video_mean_heartrate
    session_video_entry.mean_baseline_deviation = mean_baseline_deviation

    session_video_entry.heartrate = bpm_dataframe
    session_video_entry.heartrate_baseline_deviation = baseline_deviation_dataframe

def add_session_to_video_entry(video_data: VideoData, session_analytics: SessionAnalytics):
    session_video_entry: SessionVideoEntry = SessionVideoEntry()
    session_video_entry.video_analytics = session_analytics.get_filtered_subset([
        TimestampStreamFilter(video_data.video_start_time - 60, video_data.video_end_time)
    ])

    session_video_entry.video_analytics.localise_timestamps(video_data.video_start_time)

    #region [ Process Video Data ]
    retrieve_heartrate_data(video_data, session_video_entry)
    #endregion

    video_data.session_entries[session_video_entry.video_analytics.file_name] = session_video_entry

def video_analysis():
    _project_data.video_dictionary = {}
    for session_analytics in tqdm(_experiment_analytics.analytics_dataframe["Analysis_Object"], total=_experiment_analytics.analytics_dataframe.shape[0], desc="Generating Video Entries"):

        #region [ Get Paired Markers ]
        marker_pairs = session_analytics.get_paired_markers("Marker", "VideoStart", "VideoEnd")

        """ Remove the tutorial videos from the analytics."""
        marker_pairs = marker_pairs[~marker_pairs['Marker'].str.contains('panda', case=False, na=False)]
        marker_pairs = marker_pairs[~marker_pairs['Marker'].str.contains('spiders', case=False, na=False)]

        """ Remove the first video from the analytics."""
        marker_pairs = marker_pairs[1:]
        #endregion

        for video_index, row in marker_pairs.iterrows():
            #region [ Initialise Video Entry ]
            if row["Marker"] not in _project_data.video_dictionary:
                video_data = VideoData()
                video_data.video_id = row["Marker"]
                video_data.video_start_time = row["Start Timestamp"]
                video_data.video_end_time = row["End Timestamp"]
                video_data.localised_video_start_time = 0
                video_data.localised_video_end_time = row["End Timestamp"] - row["Start Timestamp"]
                video_data.video_duration = row["End Timestamp"] - row["Start Timestamp"]
                video_data.full_duration = row["End Timestamp"] - row["Start Timestamp"] + 60

                _project_data.video_dictionary[row["Marker"]] = video_data
            else:
                video_data: VideoData = _project_data.video_dictionary[row["Marker"]]
            #endregion

            add_session_to_video_entry(video_data, session_analytics)

video_analysis()

In [None]:
def process_experiment_averages():
    for key, video_data in tqdm(_project_data.video_dictionary.items(), total= len(_project_data.video_dictionary), desc="Generating Global Dictionaries"):
        video_data.mean_baseline_heartrate = statistics.mean([entry.baseline_heartrate for entry in video_data.session_entries.values()])
        video_data.mean_video_heartrate = statistics.mean([entry.video_mean_heartrate for entry in video_data.session_entries.values()])
        video_data.mean_baseline_deviation = statistics.mean([entry.mean_baseline_deviation for entry in video_data.session_entries.values()])

process_experiment_averages()

In [None]:
def plot_heartrate_subfigure(video_data: VideoData, axis: plt.Axes):
    for keys, session_video_entry in video_data.session_entries.items():
        heartrate_dataframe = session_video_entry.heartrate

        colour = _project_data.participant_dictionary[keys].colour
        axis.plot(heartrate_dataframe["Timestamp"], heartrate_dataframe["Value"], c = colour)

    axis.axhline(y=video_data.mean_baseline_heartrate, color="red", linestyle="--", alpha=0.5)
    axis.axhline(y=video_data.mean_video_heartrate, color="gray", linestyle="--", alpha=0.5)

    axis.title.set_text("Averaged Heartrate")
    axis.set_ylim(50, 100)

    axis.fill_between([-40, 0], axis.get_ylim()[0], axis.get_ylim()[1], color='red', alpha=0.1)

def plot_heartrate_deviation_subfigure(video_data: VideoData, axis: plt.Axes):
    for keys, session_video_entry in video_data.session_entries.items():
        heartrate_baseline_deviation_dataframe = session_video_entry.heartrate_baseline_deviation

        colour = _project_data.participant_dictionary[keys].colour
        axis.plot(heartrate_baseline_deviation_dataframe["Timestamp"], heartrate_baseline_deviation_dataframe["Value"], c = colour)

    axis.axhline(y=1, color="red", linestyle="--", alpha=0.5)
    axis.axhline(y=video_data.mean_video_heartrate / video_data.mean_baseline_heartrate, color="gray", linestyle="--", alpha=0.5)

    axis.title.set_text("Averaged Heartrate Deviation from Baseline")

    axis.fill_between([-40, 0], axis.get_ylim()[0], axis.get_ylim()[1], color='red', alpha=0.1)

def plot_gyroscope_subfigure(video_data: VideoData, axis: plt.Axes):
    for keys, session_video_entry in video_data.session_entries.items():
        gyroscope_magnitude_stream = session_video_entry.video_analytics.stream_data_dictionary["GYRO_Magnitude"]

        colour = _project_data.participant_dictionary[keys].colour
        axis.plot(gyroscope_magnitude_stream["Timestamp"], gyroscope_magnitude_stream["Value"], c = colour)

    axis.title.set_text("Gyroscope Magnitude")

def plot_accelerometer_subfigure(video_data: VideoData, axis: plt.Axes):
    for keys, session_video_entry in video_data.session_entries.items():
        accelerometer_magnitude_stream = session_video_entry.video_analytics.stream_data_dictionary["ACC_Magnitude"]

        colour = _project_data.participant_dictionary[keys].colour
        axis.plot(accelerometer_magnitude_stream["Timestamp"], accelerometer_magnitude_stream["Value"], c = colour)

    axis.title.set_text("Accelerometer Magnitude")

def plot_markers(video_data: VideoData, axes: list[plt.Axes]):
    for axis in axes:
        axis.axvline(x=0, color="black", linestyle="--", alpha=0.5)

        text_position: float = axis.get_ylim()[0]

        axis.text(-60, text_position, "Calibration Start", horizontalalignment='center', verticalalignment='center')
        axis.text(0, text_position, "Video Start", horizontalalignment='center', verticalalignment='center')
        axis.text(video_data.localised_video_end_time, text_position, "Video End", horizontalalignment='center', verticalalignment='center')

def plot_heartrate_variability_subfigure(video_data: VideoData, axis: plt.Axes):
    for keys, session_video_entry in video_data.session_entries.items():
        heartrate_variability_stream = session_video_entry.video_analytics.stream_data_dictionary["PPG_GRN_HRV"]

        colour = _project_data.participant_dictionary[keys].colour
        axis.plot(heartrate_variability_stream["Timestamp"], heartrate_variability_stream["Value"], c = colour)
    axis.title.set_text("Heart Rate Variability")

def generate_per_video_figures():
    figure_rows = 5
    video_index = 0
    for key, video_data in tqdm(_project_data.video_dictionary.items(), total=len(_project_data.video_dictionary), desc="Generating Video Figures"):
        video_index += 1
        figure, axes = plt.subplots(nrows=figure_rows, ncols=1, figsize=(video_data.full_duration * 0.5, figure_rows * 4), dpi=300)

        plot_heartrate_subfigure(video_data, axes[0])
        plot_heartrate_deviation_subfigure(video_data, axes[1])
        plot_heartrate_variability_subfigure(video_data, axes[2])
        plot_gyroscope_subfigure(video_data, axes[3])
        plot_accelerometer_subfigure(video_data, axes[4])

        plot_markers(video_data, axes)

        for axis in axes: axis.set_xlim(-60, video_data.localised_video_end_time)

        os.makedirs(f"/mnt/Data/Analysis/Exports/Global Analysis", exist_ok=True)
        plt.savefig(f"/mnt/Data/Analysis/Exports/Global Analysis/{video_index} Video Data.png")
        plt.close(figure)

#generate_per_video_figures()

In [None]:
def generate_global_heartrate_calibration_to_video_calibration_figure():
    calibration = ["Calibration"]
    videos = list(_project_data.video_dictionary.keys())

    participant_ids = list(_project_data.participant_dictionary.keys())
    video_ids = calibration + videos

    values = np.empty((len(video_ids), len(participant_ids)))


    for participant_index in range(len(participant_ids)):
        participant: str = participant_ids[participant_index]

        participant_data: ParticipantData = _project_data.participant_dictionary[participant]

        values[0, participant_index] = participant_data.calibration_baseline_heartrate

    for video_index in range(len(videos)):
        for participant_index in range(len(participant_ids)):
            video: str = videos[video_index]
            participant: str = participant_ids[participant_index]

            video_data: VideoData = _project_data.video_dictionary[video]
            participant_entry: SessionVideoEntry = video_data.session_entries[participant] if participant in video_data.session_entries else None

            values[video_index + 1, participant_index] = participant_entry.baseline_heartrate if participant_entry else -1


    figure, axis = plt.subplots(figsize=(16, 8), dpi=300)
    im = axis.imshow(values, cmap="viridis")

    for i in range(len(video_ids)):
        for j in range(len(participant_ids)):
            text = f"{int(values[i, j])}"  # or use f"{values[i, j]:.2f}" for floats
            axis.text(j, i, text, ha="center", va="center", color="white")

    row = 0
    rect = patches.Rectangle(
        (0 - 0.5, row - 0.5),  # x, y
        len(participant_ids),                          # width
        1,                                 # height
        linewidth=1,
        edgecolor='black',
        facecolor='none'
    )

    axis.add_patch(rect)

    axis.set_xticks(range(len(participant_ids)), labels=participant_ids, rotation=45, ha="right", rotation_mode="anchor")
    axis.set_yticks(range(len(video_ids)), labels=video_ids)

    os.makedirs(f"/mnt/Data/Analysis/Exports/Global Analysis", exist_ok=True)
    plt.savefig(f"/mnt/Data/Analysis/Exports/Global Analysis/Calibration Comparison.png")
    plt.close(figure)

#generate_global_heartrate_calibration_to_video_calibration_figure()

In [None]:
def retrieve_baseline_deviation_data(videos_keys: list[str], participants_keys: list[str]) -> NDArray[float]:
    baseline_deviation: NDArray[float] = np.zeros((len(videos_keys), len(participants_keys) + 1))

    for video_index in range(len(videos_keys)):
        for participant_index in range(len(participants_keys)):
            video: str = videos_keys[video_index]
            participant: str = participants_keys[participant_index]

            video_data: VideoData = _project_data.video_dictionary[video]
            participant_entry: SessionVideoEntry = video_data.session_entries[participant] if participant in video_data.session_entries else None
            baseline_deviation[video_index, participant_index] = participant_entry.mean_baseline_deviation if participant_entry else -1

    for video_index in range(len(videos_keys)):
        video: str = videos_keys[video_index]
        video_data: VideoData = _project_data.video_dictionary[video]

        baseline_deviation[video_index, -1] = video_data.mean_baseline_deviation

    return baseline_deviation

def display_cell_text(data: NDArray[float], axis: plt.Axes):
    for i in range(data.shape[0]):
        for j in range(data.shape[1]):
            text = f"{data[i, j]:.2f}"  # or use f"{values[i, j]:.2f}" for floats
            axis.text(j, i, text, ha="center", va="center", color="white")

def highlight_global_cells(participants: list[str], videos: list[str], axis: plt.Axes):
    column = len(participants) - 1
    row = 0
    rect = patches.Rectangle(
        (column - 0.5, row - 0.5),  # x, y
        1,                          # width
        len(videos),                                 # height
        linewidth=1,
        edgecolor='black',
        facecolor='none'
    )

    axis.add_patch(rect)

def generate_global_heartrate_deviation_figure():
    videos = list(_project_data.video_dictionary.keys())
    participants = list(_project_data.participant_dictionary.keys())
    participants_with_global = participants + ["Global"]

    data = retrieve_baseline_deviation_data(videos, participants)

    figure, axis = plt.subplots(figsize=(16, 8), dpi=300)
    im = axis.imshow(data, cmap="viridis")

    display_cell_text(data, axis)
    highlight_global_cells(participants_with_global, videos, axis)

    axis.set_xticks(range(len(participants_with_global)), labels=participants_with_global, rotation=45, ha="right", rotation_mode="anchor")
    axis.set_yticks(range(len(videos)), labels=videos)

    os.makedirs(f"/mnt/Data/Analysis/Exports/Global Analysis", exist_ok=True)
    plt.savefig(f"/mnt/Data/Analysis/Exports/Global Analysis/Heartrate Deviation.png")
    plt.close(figure)

#generate_global_heartrate_deviation_figure()

In [None]:
def plot_participant_heartrate_variability(session_video_entry: SessionVideoEntry, key: str, axis: plt.Axes):
    heartrate_variability_stream = session_video_entry.video_analytics.stream_data_dictionary["PPG_GRN_RMSSD"]
    #heartrate_variability_stream = session_video_entry.video_analytics.stream_data_dictionary["PPG_GRN_SDNN"]

    millisecond_values = heartrate_variability_stream["Value"]

    colour = _project_data.participant_dictionary[key].colour
    axis.plot(heartrate_variability_stream["Timestamp"], millisecond_values, c = colour)
    axis.title.set_text(f"Participant: {key}")

def plot_markers(video_data: VideoData, axes: list[plt.Axes]):
    for axis in axes:
        axis.axvline(x=0, color="black", linestyle="--", alpha=0.5)

        text_position: float = axis.get_ylim()[0]

        axis.text(-60, text_position, "Calibration Start", horizontalalignment='center', verticalalignment='center')
        axis.text(0, text_position, "Video Start", horizontalalignment='center', verticalalignment='center')
        axis.text(video_data.localised_video_end_time, text_position, "Video End", horizontalalignment='center', verticalalignment='center')

def generate_per_video_heartrate_variability():
    figure_rows = len(_project_data.participant_dictionary.keys())
    video_index = 0
    for video_key, video_data in tqdm(_project_data.video_dictionary.items(), total=len(_project_data.video_dictionary), desc="Generating Video Figures"):
        video_index += 1
        figure, axes = plt.subplots(nrows=figure_rows, ncols=1, figsize=(video_data.full_duration * 0.25, figure_rows * 2), dpi=300)

        figure.suptitle(video_key)

        for index, session_key in enumerate(_project_data.participant_dictionary.keys()):
            if session_key not in video_data.session_entries: continue
            session_video_entry: SessionVideoEntry = video_data.session_entries[session_key]
            plot_participant_heartrate_variability(session_video_entry, session_key, axes[index])

        plot_markers(video_data, axes)

        for axis in axes: axis.set_xlim(-60, video_data.localised_video_end_time)

        os.makedirs(f"/mnt/Data/Analysis/Exports/Global Analysis", exist_ok=True)
        plt.savefig(f"/mnt/Data/Analysis/Exports/Global Analysis/RMSSD {video_index} Video Data.png")
        plt.close(figure)

        return

generate_per_video_heartrate_variability()