In [8]:
import importlib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from scipy.interpolate import interp1d
from scipy.signal import welch
import matplotlib.dates as mdates
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA
import matplotlib.animation as animation
from matplotlib.animation import FuncAnimation
from datetime import datetime, timedelta
import threading
import time
from collections import defaultdict


import BrainBitPython.BrainBitDemo.neuro_impl.emotions_bipolar_controller
import BrainBitPython.BrainBitDemo.neuro_impl.emotions_monopolar_controller
import BrainBitPython.BrainBitDemo.neuro_impl.spectrum_controller

from BrainBitPython.BrainBitDemo.neuro_impl.emotions_bipolar_controller import EmotionBipolar
from BrainBitPython.BrainBitDemo.neuro_impl.emotions_monopolar_controller import EmotionMonopolar
from BrainBitPython.BrainBitDemo.neuro_impl.spectrum_controller import SpectrumController

importlib.reload(BrainBitPython.BrainBitDemo.neuro_impl.emotions_bipolar_controller)
importlib.reload(BrainBitPython.BrainBitDemo.neuro_impl.emotions_monopolar_controller)
importlib.reload(BrainBitPython.BrainBitDemo.neuro_impl.spectrum_controller)

import hrvanalysis

mpl.rcParams['animation.ffmpeg_path'] = r'C:\Program Files\FFmpeg\bin\ffmpeg.exe'

%matplotlib widget

In [9]:
# Running Procedure

# Add data csv's to the folders in 'Data'

# See Options Below
# Setup however require for the current data

# Make sure you see 'Python 3.X.X' in the top right of this window
# If it says 'select kernal' or smth, click it and select python

# See Toolbar at the top of the window (Code, Markdown, Run All, Restart)
# Start each run by hitting 'Restart'
# If anything gives an error, try hitting restart first (confirm restart if needed)

# Hit 'Run All'

# Runs can probably take up to a few minutes depending on the volume of data
# You can see how it has completed on each code block's bottom left with the check mark and time it took to run

# Each blocks output will show below itself, but if you just need the final video, it will automatically show up in the Data/Exports folder with the name you gave

In [10]:
# Options

# How much time should be shown at any one time on the chart (Default 60 seconds)
LiveChartWindowSizeInSeconds = 60

# Heart Rate File in the Data/Watch/ folder, no extention needed (ex: 'Checkme O2 Max _20250716192137')
HeartRate_CSVFileName = 'Checkme O2 Max _20250716192137'

# The interval timing for the Heart Rate & EEG Data (Every _ seconds, default 2 seconds)
MeasuringIntervalInSeconds = 2

# Export timespans, This lets you set when you want the export to start in terms of UTC time (ex. to match up with eeg recording) and how long to run it for
# Workflow for this would be get the EEG startime from OBS recording, and total run duration, and then export a heartrate video to match perfectly  

# Time that you want the export to start at, make sure it is UTC time (ex: '2025-07-21 23:45:00')
ExportWindowStartTime = '2025-07-16 19:21:37'

# The (Minutes:Seconds) duration you want the export to be
ExportWindowDuration = '20:00'

# Export File Name for Video Recording of Chart Animation (ex: 'Brett Chart')
ChartExportFileName = "Brett Chart"

# Width and Height for the Video, more for getting the ratio of W/H (Ex. Width: 15, Height: 5)
ExportWidth = 15
ExportHeight = 5


In [11]:
# Options for EEG Data

# EEG File in the Data/EEG/ folder, no extention needed (ex: '15.07.2025_17-42 - 15.07.2025_17-42')
EEG_CSVFileName = '15.07.2025_17-42 - 15.07.2025_17-42'

# Start time for the EEG Data, make sure it is UTC time (ex: '2025-07-21 23:45:00')
EEG_UTCStartTime = '2025-07-21 23:45:00'

# Not fully implemented yet
EEG_Enabled = False

In [12]:
rawDf = pd.read_csv(f'Data/Watch/{HeartRate_CSVFileName}.csv')

# Convert Time column to datetime
rawDf['Time'] = pd.to_datetime(rawDf['Time'], format='%H:%M:%S %b %d %Y')

# Convert duration to timedelta
minutes, seconds = map(int, ExportWindowDuration.split(":"))
duration = timedelta(minutes=minutes, seconds=seconds)

ExportWindowStartTimeDT = datetime.strptime(ExportWindowStartTime, "%Y-%m-%d %H:%M:%S")

ExportWindowEndTime = ExportWindowStartTimeDT + duration

filtered_df = rawDf[(rawDf['Time'] >= ExportWindowStartTimeDT) & (rawDf['Time'] <= ExportWindowEndTime)].copy()

full_time_index = pd.date_range(start=ExportWindowStartTimeDT, end=ExportWindowEndTime, freq='2S')

# Set Timestamp as index
filtered_df.set_index('Time', inplace=True)

# Reindex to full time range
filled_df = filtered_df.reindex(full_time_index)

# Reset index and rename
filled_df.reset_index(inplace=True)
filled_df.rename(columns={'index': 'Time'}, inplace=True)
filled_df.fillna(0, inplace=True)

df = filled_df

# Remove non-numeric pulse entries if needed
df = df[df['Pulse Rate'] != '--']
df['Pulse Rate'] = pd.to_numeric(df['Pulse Rate'], errors='coerce')
df.dropna(subset=['Pulse Rate'], inplace=True)

# Convert BPM to RR intervals in milliseconds
# RR(ms) = (60 / HR)
df['RR'] = 60 / df['Pulse Rate']

def compute_rmssd(rr_intervals):
    diffs = np.diff(rr_intervals)
    squared_diffs = diffs ** 2
    return np.sqrt(np.mean(squared_diffs))

# 10-point rolling RMSSD (adjust based on sampling rate)
df['RMSSD'] = df['RR'].rolling(window=10).apply(compute_rmssd, raw=True)

average_rmssd = df['RMSSD'].mean()
df['Above Avg HRV'] = df['RMSSD'] > average_rmssd

  full_time_index = pd.date_range(start=ExportWindowStartTimeDT, end=ExportWindowEndTime, freq='2S')


In [13]:
# Create triple y-axis layout
fig = plt.figure(figsize=(ExportWidth, ExportHeight))
host = host_subplot(111, axes_class=AA.Axes)
plt.subplots_adjust(left=0.4, right=0.5)

par1 = host.twinx()      # Right axis for Heart Rate
par2 = host.twinx()      # Motion layer

# 📌 Offset heart rate axis further to the right
par1.axis["right"].toggle(all=True)
par2.axis["right"] = par2.get_grid_helper().new_fixed_axis(loc="right", axes=par2, offset=(60, 0))
par2.axis["right"].toggle(all=True)

# ⏰ Format x-axis ticks
host.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))

# 🟦 Plot RMSSD on left y-axis
p1, = host.plot(df['Time'], df['RMSSD'], color='steelblue', label='HRV (RMSSD)', linewidth=1)
host.axhline(df['RMSSD'].mean(), color='gray', linestyle='--')
host.set_ylabel('HRV (ms)', color='steelblue')
host.tick_params(axis='y', labelcolor='steelblue')

# 🔴 Heart Rate on second y-axis
p2, = par1.plot(df['Time'], df['Pulse Rate'], color='crimson', linestyle='--', label='Heart Rate (BPM)', linewidth=1)
par1.set_ylabel('Heart Rate (BPM)', color='crimson')
par1.tick_params(axis='y', labelcolor='crimson')

# 🟧 Motion as third line without visible axis
p3, = par2.plot(df['Time'], df['Motion'], color='darkorange', label='Motion', alpha=0.5, linewidth=1)
par2.axis["right"].label.set_visible(False)
par2.axis["right"].major_ticklabels.set_visible(False)
par2.axis["right"].major_ticks.set_visible(False)

# 🧭 Legend
host.legend(handles=[p1, p2, p3], loc='upper left')
plt.title('HRV (RMSSD) with Heart Rate and Motion Overlay')
plt.tight_layout(pad=2.0)
#plt.show()

# Calculate static limits based on full dataset
rmssd_min, rmssd_max = df['RMSSD'].min() - 0.005, df['RMSSD'].max() + 0.005
pulse_min, pulse_max = 0, 200
motion_min, motion_max = df['Motion'].min() - 1, df['Motion'].max() + 1

# Apply fixed limits to each axis
host.set_ylim(rmssd_min, rmssd_max)
par1.set_ylim(pulse_min, pulse_max)
par2.set_ylim(motion_min, motion_max)

# Animation update function
def update(frame):
    start_time = df['Time'].iloc[0] + pd.Timedelta(seconds=MeasuringIntervalInSeconds * frame)
    end_time = start_time + pd.Timedelta(seconds=LiveChartWindowSizeInSeconds)
    window_df = df[(df['Time'] >= start_time) & (df['Time'] <= end_time)]

    if window_df.empty:
        return p1, p2, p3

    # Update plot data
    p1.set_data(window_df['Time'], window_df['RMSSD'])
    p2.set_data(window_df['Time'], window_df['Pulse Rate'])
    p3.set_data(window_df['Time'], window_df['Motion'])

    # Adjust x-axis
    host.set_xlim(start_time, end_time)

    return p1, p2, p3

print(f'Creating Animation with Window Size: {LiveChartWindowSizeInSeconds}sec and Data Increments of {MeasuringIntervalInSeconds}sec. Video Length: {len(df) * 1.0 / MeasuringIntervalInSeconds / 60.0:.2f}min')

# Create animation
ani = FuncAnimation(fig, update, frames=len(df), interval=MeasuringIntervalInSeconds * 1000)
#plt.show()

# Flag to control the loop
exporting = True

def show_progress():
    max_width = len("Generating...")  # fixed width to overwrite trailing dots
    dots = 0
    while exporting:
        message = "Generating" + "." * (dots % 4)
        padded = message.ljust(max_width)  # pad with spaces
        print(f"\r{padded}", end="", flush=True)
        dots += 1
        time.sleep(0.5)

# Start background thread
progress_thread = threading.Thread(target=show_progress)
progress_thread.start()

ani.save(f'Data/Exports/{ChartExportFileName}.mp4', writer='ffmpeg', fps=1.0 / MeasuringIntervalInSeconds)

# Stop the progress indicator
exporting = False
progress_thread.join()

print(f'\nAnimation Exported to: {ChartExportFileName}.mp4')

plt.close(fig)

Creating Animation with Window Size: 60sec and Data Increments of 2sec. Video Length: 5.01min
Generating...
Animation Exported to: Brett Chart.mp4


In [14]:
if(EEG_Enabled):
    eeg_df = pd.read_csv(f'Data/EEG/{EEG_CSVFileName}.csv')

    #O1,O2,T3,T4,O1_T3,O2_T4
    eeg_df['O1'] = pd.to_numeric(eeg_df['O1'], errors='coerce')
    eeg_df['O2'] = pd.to_numeric(eeg_df['O2'], errors='coerce')
    eeg_df['T3'] = pd.to_numeric(eeg_df['T3'], errors='coerce')
    eeg_df['T4'] = pd.to_numeric(eeg_df['T4'], errors='coerce')
    eeg_df['O1_T3'] = pd.to_numeric(eeg_df['O1_T3'], errors='coerce')
    eeg_df['O2_T4'] = pd.to_numeric(eeg_df['O2_T4'], errors='coerce')

    start_time = datetime.strptime(EEG_UTCStartTime, "%Y-%m-%d %H:%M:%S")
    timestamps = [start_time + timedelta(seconds=MeasuringIntervalInSeconds * i) for i in range(len(eeg_df))]
    eeg_df['Timestamp'] = timestamps

    eeg_df = eeg_df[:100]

    #print(eeg_df)

    spectrumController = SpectrumController()

    # Create empty lists to store results per row
    spectrum_results = defaultdict(list)  # e.g., {'O1': [...], 'O2': [...]}
    waves_results = defaultdict(lambda: defaultdict(list))  # e.g., {'O1': {'delta_raw': [...], ...}, ...}

    count = 0

    def __processed_spectrum(spectrum, channel):
        #print(count)
        spectrum_results[channel].append(spectrum)

    def __processed_waves(waves_data, channel):
        #print(count)
        for attr, value in waves_data.__dict__.items():
            waves_results[channel][attr].append(value)


    for _, row in eeg_df.iterrows():
        single_row_df = pd.DataFrame([row])  # Wrap row in a DataFrame

        # Assign callbacks
        spectrumController.processedSpectrum = __processed_spectrum
        spectrumController.processedWaves = __processed_waves

        # Process the row
        spectrumController.process_data(single_row_df)

        count += 1

    # Add spectrum results
    #for channel in ['O1', 'O2', 'T3', 'T4']:
    #    eeg_df[f'{channel}_spectrum'] = spectrum_results[channel]

    # Add waves results
    #for channel in ['O1', 'O2', 'T3', 'T4']:
    #    for wave_attr in waves_results[channel]:
    #        col_name = f'{channel}_{wave_attr}'
    #        eeg_df[col_name] = waves_results[channel][wave_attr]

    start_time = datetime.strptime(EEG_UTCStartTime, "%Y-%m-%d %H:%M:%S") + timedelta(seconds=MeasuringIntervalInSeconds)
    timestamps = [start_time + timedelta(seconds=MeasuringIntervalInSeconds * i) for i in range(len(waves_results))]
    waves_results['Timestamp'] = timestamps

    spectrumController.clear()

    print(waves_results)

    print(f'EEG Data Length: {len(eeg_df)}')
    print('Spectrum length:')
    print({channel: len(values) for channel, values in spectrum_results.items()})
    print('Wave length:')
    print({
        channel: {wave_type: len(values) for wave_type, values in wave_dict.items()}
        for channel, wave_dict in waves_results.items()
    })

    # Set up wave bands and channels
    wave_bands = ['delta_rel', 'theta_rel', 'alpha_rel', 'beta_rel', 'gamma_rel']
    channels = ['O1', 'O2', 'T3', 'T4']

    # Create subplots
    fig, axes = plt.subplots(nrows=4, ncols=1, figsize=(14, 10), sharex=True)
    fig.suptitle("Relative EEG Wave Composition Over Time", fontsize=16)

    # Plot each channel
    for i, channel in enumerate(channels):
        ax = axes[i]
        
        # Extract wave data for this channel
        stacked_data = eeg_df[[f'{channel}_{band}' for band in wave_bands]]
        
        # Plot stacked area chart
        ax.stackplot(eeg_df['Timestamp'], stacked_data.T, labels=wave_bands)
        ax.set_ylabel(channel)
        ax.legend(loc='upper right', fontsize=8)
        ax.grid(True)

    # Final touches
    plt.xlabel("Time")
    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.show()
