In [None]:
import time
import sys
import logging
import csv
import json
import os
import tempfile
import traceback
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import scipy
from imutils import face_utils
from face_utilities import Face_utilities
from signal_processing import Signal_processing, detect_peaks, plot_peaks, filter_ppg
from pyPPG.datahandling import load_data


logging.basicConfig(level=logging.INFO)

In [None]:
data = pd.read_csv("assets/data.csv", index_col='N')
data['HR_predicted'] = np.NaN
data['points_sp'] = json.dumps([])
data['points_dn'] = json.dumps([])
display_video = True

for index, row in data.iterrows():
    if pd.isna(row['Recording']):
        continue
    #if index<16:
    #    continue

    ###########################################################
    # Extract signal from face
    ###########################################################
    #cap = cv2.VideoCapture("assets/video_3.mp4")
    logging.info('-'*50)
    logging.info(f"Reading video {index} ({row['Recording']})")
    
    row['Recording'] = '17:12:06'
    if os.path.exists(f"assets/video/{row['Recording']}.mp4"):
        extension = "mp4"
    elif os.path.exists(f"assets/video/{row['Recording']}.mov"):
        extension = "mov"
    cap = cv2.VideoCapture(f"assets/video/{row['Recording']}.{extension}")
    
    fu = Face_utilities()
    sp = Signal_processing()
    
    i=0
    
    t = time.time()
    
    #for signal_processing
    BUFFER_SIZE = 100
    
    fps=0 #for real time capture
    video_fps = cap.get(cv2.CAP_PROP_FPS) # for video capture
    #print(video_fps)
    
    times: list = []  # size: 1 x BUFFER_SIZE
    data_buffer: list = []  # size: 1 x BUFFER_SIZE
    data_buffer_all: list = []  # size: 1 x videoFrames
    filtered_data = []  # size: 1 x BUFFER_SIZE
    fft_of_interest = []
    freqs_of_interest = []
    
    bpm = 0
    bpm_all = []  # size: 1 x videoFrames
    
    
    
    while True:
        # grab a frame -> face detection -> crop the face -> 68 facial landmarks -> get mask from those landmarks
        t0 = time.time()    
        ret, frame = cap.read()
        
        if frame is None:
            break
        
        ret_process = fu.no_age_gender_face_process(frame, "68")
        
        if ret_process is None:
            logging.warning("No face detected")
            continue
        
        rects, face, shape, aligned_face, aligned_shape = ret_process
        

        if display_video:
            (x, y, w, h) = face_utils.rect_to_bb(rects[0])
            cv2.rectangle(frame,(x,y),(x+w,y+h),(0,0,255),2)

            if(len(aligned_shape)==68):
                cv2.rectangle(aligned_face,(aligned_shape[54][0], aligned_shape[29][1]), #draw rectangle on right and left cheeks
                        (aligned_shape[12][0],aligned_shape[33][1]), (0,255,0), 0)
                cv2.rectangle(aligned_face, (aligned_shape[4][0], aligned_shape[29][1]), 
                        (aligned_shape[48][0],aligned_shape[33][1]), (0,255,0), 0)
            else:
                cv2.rectangle(aligned_face, (aligned_shape[0][0],int((aligned_shape[4][1] + aligned_shape[2][1])/2)),
                            (aligned_shape[1][0],aligned_shape[4][1]), (0,255,0), 0)
                
                cv2.rectangle(aligned_face, (aligned_shape[2][0],int((aligned_shape[4][1] + aligned_shape[2][1])/2)),
                            (aligned_shape[3][0],aligned_shape[4][1]), (0,255,0), 0)
            
            for (x, y) in aligned_shape: 
                cv2.circle(aligned_face, (x, y), 1, (0, 0, 255), -1)
            
            
        #for signal_processing
        ROIs = fu.ROI_extraction(aligned_face, aligned_shape)
        green_val = sp.extract_color(ROIs)
        logging.debug(green_val)
        
        data_buffer.append(green_val)
        data_buffer_all.append(green_val)
        
        times.append((1.0/video_fps)*i)
        
        L = len(data_buffer)
        if L <= BUFFER_SIZE:
            if display_video:
                cv2.putText(frame, "Filling up buffer...", (30,int(frame.shape[0]*0.95)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
        else:
            data_buffer = data_buffer[-BUFFER_SIZE:]
            times = times[-BUFFER_SIZE:]
            fps = float(BUFFER_SIZE) / (times[-1] - times[0])
            
            detrended_data = sp.signal_detrending(data_buffer)
            interpolated_data = sp.interpolation(detrended_data, times)
            normalized_data = sp.normalization(interpolated_data)
            
            fft_of_interest, freqs_of_interest = sp.fft(normalized_data, fps)
            max_arg = np.argmax(fft_of_interest)
            bpm = freqs_of_interest[max_arg]
            if display_video:
                cv2.putText(frame, "fps: {0:.2f}".format(fps), (30,int(frame.shape[0]*0.85)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
                cv2.putText(frame, "HR_real: {0:.2f}".format(row['HR']), (30,int(frame.shape[0]*0.90)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
                cv2.putText(frame, "HR_predicted: {0:.2f}".format(bpm), (30,int(frame.shape[0]*0.95)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
            filtered_data = sp.butter_bandpass_filter(interpolated_data, (bpm-20)/60, (bpm+20)/60, fps, order = 3)
            
        bpm_all.append(bpm)
    
        # display
        if display_video:
            cv2.imshow("frame",frame)
            cv2.imshow("face",aligned_face)
        i = i+1
        logging.debug("time of the loop number "+ str(i) +" : " + str(time.time()-t0))
        if i % int(video_fps * 10) == 0:
            logging.info(f"Seconds of video processed: {i / video_fps:.1f}")
        
        # waitKey to show the frame and break loop whenever 'q' is pressed
        if display_video:
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()

    ###########################################################
    # Extract fiducials from signal
    ###########################################################
    assert len(data_buffer) == 100
    assert len(bpm_all) == len(data_buffer_all)
    assert fps > 20
    # Load into PyPPG
    # TODO:
    # Dear PyPPG maintainers: please document that `load_data` expects a mat file
    # And that the mat file should contain a field `Data` with a 1d array, and a field `Fs` with the sampling rate.
    with tempfile.NamedTemporaryFile(suffix='.mat') as temp:
        scipy.io.savemat(temp.name, {
            'Data': data_buffer_all,
            'Fs': fps,
        })
        signal = load_data(data_path=temp.name, fs=0, start_sig=0, end_sig=-1, channel="Pleth", use_tk=True, print_flag=True)

    filtered = filter_ppg(signal, verbose=False)

    # Plot in time domain
    points_to_plot = 2000
    filtered = filtered[:points_to_plot-740]
    plt.plot(signal.v[:points_to_plot], label='Original signal')
    plt.plot(filtered[:points_to_plot], label='Filtered signal')
    plt.legend()
    plt.show()

    # peak detection
    signal.ppg = filtered
    try:
        fiducials = detect_peaks(signal)
        plot_peaks(signal, fiducials, 'Fiducial detection')
        assert type(fiducials) == pd.DataFrame
        assert fiducials.shape[0] > 5
    except Exception as e:
        logging.error(f'Excpetion at peak identification: {e}, {traceback.format_exc()}')
        fiducials = None

    ###########################################################
    # Analysis
    ###########################################################

    #plt.plot(data_buffer_all)
    #plt.title(f"Detection for recording {row['Recording']}")
    #plt.show()
    
    #plt.plot(bpm_all)
    #plt.show()

    if fiducials is not None:
        # Take bpm by the median number of frames between two consecutive systolic peaks
        median_bpm = int(fiducials['sp'].diff().median() * 60 / fps)
        points_sp = (fiducials['sp']/fps).astype(float).tolist()
        points_dn = (fiducials['dn']/fps).astype(float).tolist()
        data.loc[index, 'HR_predicted'] = median_bpm
    else:
        #plot_peaks(signal, pd.DataFrame(columns=['on', 'sp', 'dn', 'dp', 'off', 'u', 'v', 'w', 'a', 'b', 'c', 'd', 'e', 'f', 'p1', 'p2']), 'Fiducial detection')
        median_bpm = 0
        points_sp = []
        points_dn = []
        # TODO: reestimate fft from filtered signal
        data.loc[index, 'HR_predicted'] = int(np.mean(bpm_all[BUFFER_SIZE:]))
    data.loc[index, 'points_sp'] = json.dumps(points_sp)
    data.loc[index, 'points_dn'] = json.dumps(points_dn)

    logging.info("End of video. Total running time: " + str(time.time() - t))
    logging.info(f"We should have about {int(video_fps*30)} frames, and we have {len(bpm_all)}")
    logging.info(f"Final BPM before filtering: {int(bpm_all[-1])}")
    logging.info(f"Average BPM before filtering: {int(np.mean(bpm_all[BUFFER_SIZE:]))}")
    logging.info(f"Median BPM after filtering: {median_bpm}")
    logging.info(f"Real BPM: {row['HR']}")

    pd.DataFrame(
        data_buffer_all,
        index=pd.Series(np.arange(0, (1/fps)*len(data_buffer_all), 1/fps), name='time'),
        columns=['signal'],
    ).to_csv(f"assets/ecg_from_video/{row['Recording']}.csv")
    
    
    #break
    #if index >= 2:
    #    break



INFO:root:--------------------------------------------------
INFO:root:Reading video 1 (15:50:04)


KeyboardInterrupt: 

In [None]:
data

In [None]:
data.to_csv("assets/data_predicted.csv", quoting=csv.QUOTE_ALL)

In [None]:
'''
wtf = pd.read_csv(f"assets/ecg_from_video/15:50:04.csv")
with tempfile.NamedTemporaryFile(suffix='.mat') as temp:
    scipy.io.savemat(temp.name, {
        'Data': wtf['signal'].values,
        'Fs': wtf['time'].shape[0] / (wtf['time'].iloc[-1] - wtf['time'].iloc[0]),
    })
    signal = load_data(data_path=temp.name, fs=0, start_sig=0, end_sig=-1, channel="Pleth", use_tk=True, print_flag=True)
''';