In [None]:
import pandas as pd
from glob import glob
from pyedflib import highlevel as reader
from matplotlib import pyplot as plt
import torch
import pickle
# read an edf file
signals, signal_headers, header = reader.read_edf('/storage/HCI/Sessions/59/Part_1_Trial18_taggingImages1.bdf')

In [None]:
signals.shape

In [None]:
signal_labels = [x["label"] for x in signal_headers]
print(signal_labels,f"len:{len(signal_labels)}")
print("=============================")
print(signal_labels.index("EXG1"))
print(signal_labels.index("EXG2"))
print(signal_labels.index("EXG3"))

In [None]:
signal_headers

In [None]:
hr_freq = signal_headers[32]["sample_frequency"]
print(hr_freq)
hr_skip = int(hr_freq * 30)
hr_dur = int(hr_freq * 3)
hr_data = signals[33][hr_skip:hr_skip + hr_dur]
plt.plot([i/hr_freq for i in range(len(hr_data))],hr_data)

In [None]:
event_freq = signal_headers[46]["sample_frequency"]
event_skip = int(event_freq * 29)
event_data = signals[46][event_skip:-event_skip]
plt.plot([i/event_freq for i in range(len(event_data))],event_data)

In [None]:
timestamps = []
prev = None
for i,v in enumerate(event_data):
    if(not v == prev):
        timestamps.append(i)
        prev = v
timestamps
print(f"stimuate duration: {(timestamps[-2] - timestamps[1])/event_freq}")


In [None]:
import os
from src.datasets import SessionMeta,RPPG

test = SessionMeta("./datasets/hci/Sessions/521/",cropped_dir="./datasets/hci/cropped_faces/521/")
test.__dict__

In [None]:
c = RPPG.get_default_config()
c.clip_duration = 10
c.num_frames = c.clip_duration * 5
c.train_ratio = 1.0
# c.meta_folder=None

In [None]:
# from src.models import Detector
# import pandas as pd
# import torch
# from src.models import Detector
# from main import get_config,init_accelerator,set_seed
# from torchinfo import summary

# config = get_config("./configs/rppg.yml")

# # initialize accelerator and trackers (if enabled)
# accelerator = init_accelerator(config)
# model = Detector(config.model, 50, accelerator)

In [None]:
x = RPPG(c)

In [None]:
# x.save_meta()

In [None]:
len(x.session_metas)

In [None]:
r = x.get_dict(100)
frames,label,hr_data, masks, measures,wd = r["frames"],r["label"],r["hr_data"],r["mask"],r["measures"],r["wd"]

In [None]:
frames.shape

In [None]:
import torchvision.transforms as T
trans =  T.Compose([
    T.Resize(224, interpolation=T.InterpolationMode.BICUBIC),
    T.CenterCrop(224),
    T.ConvertImageDtype(torch.float32),
    T.Normalize((0.48145466, 0.4578275, 0.40821073),
                (0.26862954, 0.26130258, 0.27577711)),
])
trans(frames)

In [None]:
measures["bpm"]

In [None]:
masks

In [None]:
import numpy as np

In [None]:
print(len(hr_data),len(frames))

In [None]:
plt.imshow(frames[0])

In [None]:
plt.imshow(np.stack(frames[:30],axis=1).reshape((150,-1,3)))

In [None]:
plt.plot(hr_data)

In [None]:
import heartpy as hp
hp.plotter(wd, measures)
#display computed measures
for measure in measures.keys():
    print('%s: %f' %(measure, measures[measure]))

In [None]:
from tqdm import tqdm
for i in tqdm(range(len(x))):
    try:
        x[i]
    except Exception as e:
        print(i,e)

In [None]:
import math
from src.datasets import resample
self = x
idx = 199

session_idx =  next(i for i,x in enumerate(self.stack_session_clips) if  idx < x)
session_meta = self.session_metas[session_idx]
session_offset_duration =  (idx - (0 if session_idx == 0 else self.stack_session_clips[session_idx-1]))*self.clip_duration
# heart rate data processing
signals, signal_headers, _ = reader.read_edf(session_meta.bdf_path,ch_names=["EXG1","EXG2","EXG3","Status"])
_hr_datas = []
for hr_channel_idx in range(3):
    try:
        assert  int(session_meta.session_hr_sample_freq) == int(signal_headers[hr_channel_idx]["sample_frequency"])
        # - the ERG sample frequency
        hr_sample_freq =  session_meta.session_hr_sample_freq
        # - the amount of samples to skip, including the 30s stimulation offset and session clip offset.
        hr_sample_offset =  session_meta.flag_hr_beg_sample + int(session_offset_duration*hr_sample_freq)
        # - the amount of samples for the duration of a clip
        hr_clip_samples = int(hr_sample_freq*self.clip_duration)
        # - fetch heart rate data of clip duration
        _hr_data = signals[hr_channel_idx][hr_sample_offset:hr_sample_offset + hr_clip_samples]
        # - preprocess the ERG data: filter out the noise.
        _hr_data = hp.filter_signal(_hr_data, cutoff = 0.05, sample_rate = session_meta.session_hr_sample_freq, filtertype='notch')
        # - scale down the ERG value to 3.4 max.
        _hr_data = (_hr_data - _hr_data.min())/(_hr_data.max()-_hr_data.min()) * 3.4
        # - resample the ERG
        _hr_data = resample(_hr_data, len(_hr_data) * 10)
        # - process the ERG data: get measurements.
        _wd, _measures = hp.process(hp.scale_data(_hr_data),session_meta.session_hr_sample_freq * 10)
        # - nan/error check
        # if(_measures["bpm"] > 180 or _measures["bpm"] < 41):
            # continue
    
        for v in _measures.values():
            # ignore
            if type(v)==float and math.isnan(v):
                break
        else:
            # - save for comparison.
            _hr_datas.append((_hr_data,_measures,_wd))
    except Exception as e:
        print(f"Error occur during heart rate analysis for index {idx}:{e}")
        continue

if(len(_hr_datas) == 0):
    raise Exception(f"Unable to process the ERG data for index {idx}")

# get the best ERG measurement result with the sdnn
best_pair = sorted(_hr_datas,key=lambda x : x[1]["sdnn"])[0]
hr_data,measures,wd = best_pair[0], best_pair[1], best_pair[2]
bpm = measures["bpm"]
1/(pow(2*math.pi,0.5))*pow(math.e,(-pow((31-(72-41)),2)/2))
label = torch.tensor([1/(pow(2*math.pi,0.5))*pow(math.e,(-pow((k-(bpm-41)),2)/2)) for k in range(180)])

import heartpy as hp
hp.plotter(wd, measures)
#display computed measures
for measure in measures.keys():
    print('%s: %f' %(measure, measures[measure]))
