In [11]:
#ARVIS TECH Internship Programme
#September 2022

#Pulse

In [1]:
import cv2
import mediapipe as mp
import matplotlib.pyplot as plt
import numpy as np
import itertools
from scipy.signal import filtfilt, butter, lfilter, medfilt
from matplotlib import pyplot as plt 
import math
import pandas as pd
from scipy import stats

In [2]:
def polygon(image, coordinates, facial_landmarks):
    #returns the region in the frame as a polygon for the selected landmark points.

    height, width, _ = image.shape

    area = []
    for i in coordinates:
        pt1 = facial_landmarks.landmark[i]
        x = int(pt1.x * width)
        y = int(pt1.y * height)
        area.append([x,y])
        #cv2.circle(image, (x, y), 2, (100, 100, 0), -1)
        #cv2.putText(image, str(i), (x, y), 0, 0.5, (0, 0, 0))
        
    area = np.array(area)
    area = area.reshape((-1, 1, 2))
    #cv2.polylines(image,[area],True,(0,255,255))
    
    return area

In [3]:
def extract_roi(image, yanak_sag , yanak_sol, alin):
    #this function extracts ROI areas and put them on a black background from the base image

    mask = np.zeros(image.shape[:2], dtype="uint8")
    cv2.drawContours(mask, [yanak_sag], -1, (255, 255, 255), -1, cv2.LINE_AA)
    cv2.drawContours(mask, [yanak_sol], -1, (255, 255, 255), -1, cv2.LINE_AA)
    cv2.drawContours(mask, [alin], -1, (255, 255, 255), -1, cv2.LINE_AA)

    output = cv2.bitwise_and(image, image, mask=mask)
    return output

In [4]:
def green(image, area):
    #function to find green channel averages of given image and ROI

    mask_G = np.zeros(image.shape[:2], dtype="uint8")
    cv2.drawContours(mask_G, [area], -1, (255, 255, 255), -1, cv2.LINE_AA)
    mean = cv2.mean(image[:,:,1], mask=mask_G)
    #print(f"{txt}= {mean}")
    return mean

In [5]:
def update_signal(green_mean, green_arr, n_frames):
    #updates green channel array signal
    
    #if size of array is less than given limit, it appends the green channel mean to the signal array
    if len(green_arr) < n_frames:
        green_arr.append(green_mean)
        
    #else it keeps the size of the array constant by deleting an element from the beginning and appending an element to the end
    else:
        del green_arr[0]
        green_arr.append(green_mean)

In [6]:
def nabiz_bul(mean_list, fps, hz_bottom, hz_top):
    
    sampling_rate = fps
    raw_signal = np.squeeze(mean_list)
    
    
    #We applied the Z-score normalization method to the raw signal.
    #This process makes the mean of the raw signal 0 and its standard deviation 1.
    #General formula of Z-score normalization (signal - avg(signal))/std(signal).
    #The reason why we do normalization is to put all the values in the same range before putting the signal into the median filter.
    #so the median filter works better when trying to find the median value.
    normalized_signal = raw_signal - np.mean(raw_signal)#We used the formula which I mentioned above.
    normalized_signal = normalized_signal / np.std(normalized_signal)#We used the formula which I mentioned above.
    
    
    
    #We apply a median filter to the normalized signal.
    #The reason why we apply a median filter to the signal is to remove noise by preventing spikes in the signal.
    #smoothes out spikes in the signal
    med_signal = medfilt(normalized_signal, 5)#applying a median filter to the normalized signal (window size is 5)
    
    
    
    #A band-pass Butterworth filter is applied to the signal.
    #In this part, while we receive signals within the signal range we want, we eliminate the signals outside of the range.
    #for example, if you just want to measure 60 to 180 pulse range: 60/60=1Hz 180/60=3Hz(lower and upper limit in frequency)
    nyq = 0.5 * sampling_rate#We applied the nyquist frequency formula.(fn = 1/2Δt )
    low = hz_bottom / nyq#lower frequency limit for our band-pass filter
    high = hz_top / nyq#upper frequency limit for our band-pass filter
    b, a = butter(3, [low, high], btype = 'band')#Prepares a 3rd degree band-pass butterworth filter and returns the filter coefficients.
    filtered_signal = lfilter(b, a, med_signal)#filter the signal
    
    
    #we apply a hamming window to the signal
    #Window functions are multiplier functions of signal parts.
    #With the help of these functions, it is ensured that the middle parts of the signal parts are highlighted.
    #The parts near the beginning and end of the windows are extinguished. In this way, spectral leakage is attenuated.
    window = np.hamming(len(filtered_signal))#prepares the hamming window multiplier function
    windowed_signal = filtered_signal * window#The signal is multiplied by the multiplier function.
    
    
    
    #this is where FFT and Power spectrum are applied to the signal.
    #FFT Converts the signal from time domain to frequency domain.
    #The frequency domain is in the set of complex numbers.
    #We convert this set to the set of real numbers by multiplying it by its conjugate.
    #The frequency spectrum transferred to real numbers is called power spectrum.
    def nextpow(x):
        return 2 ** math.ceil(math.log2(x))

    NFFT = nextpow(len(windowed_signal) * sampling_rate)#Length of the transformed axis of the output (we applied 0 padding)
    freqDomain = np.fft.fft(windowed_signal, NFFT)#Calculates the one-dimensional discrete Fourier Transform (we move to the frequency range)
    conj = np.conjugate(freqDomain)#the conjugate of the frequency range is taken

    powerSpectrum = np.multiply(freqDomain, conj) / NFFT#The frequency range is multiplied by its conjugate to get the real number range.
    powerSpectrum = np.real(powerSpectrum)#even though complex values are eliminated the complex parts are still shown as 0j.The 0j portions are removed.

    freqInterestL = hz_bottom#predetermined lower frequency limit
    freqInterestH = hz_top#predetermined upper frequency limit

    freqs = np.linspace(0, sampling_rate, NFFT)#Divide the length from 0 to NFFT into steps of sampling_rate size.
    fRange = list()
    #Browse the sampled frequencies and add indexes to the list if the frequencies fall within the range of interest.
    for i in range(len(freqs)):
        if freqs[i] < freqInterestH and freqs[i] > freqInterestL:
            fRange.append(i + 1)
    #Return the selected indexes in Hz.
    HRange = list()
    for i in range(len(fRange)):
        idx = 60 * freqs[fRange[i]]
        HRange.append(idx)
    max_x = HRange[powerSpectrum[fRange].argmax()]#Get the highest value in HRange
    return max_x

In [9]:
def find_pulse(path = 0, plot_signal = False):
    cap = cv2.VideoCapture(path)

    #finds the fps of the video
    (major_ver, minor_ver, subminor_ver) = (cv2.__version__).split('.')
    if int(major_ver)  < 3 :
        fps = cap.get(cv2.cv.CV_CAP_PROP_FPS)
    else :
        fps = cap.get(cv2.CAP_PROP_FPS)
    
    font = cv2.FONT_HERSHEY_SIMPLEX 

    # Face Mesh
    mp_face_mesh = mp.solutions.face_mesh
    face_mesh = mp_face_mesh.FaceMesh()

    
    #array to store green channel means of each frame   
    green_arr = []
    
    #array to store pulse values   
    nabiz = []


    # Create figure and subplot
    if plot_signal:
        fig = plt.figure()
        ax = fig.add_subplot(1, 1, 1)
        
    #frame counter
    ctr = 0
    
    while True:
        ret, image = cap.read()
        if ret is not True:
            break

        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # Facial landmarks
        result = face_mesh.process(rgb_image)
        
        #mediapipe crashes in case it cannot find a face to detect therefore try/except blocks used here
        try:

            for facial_landmarks in result.multi_face_landmarks:
                
                #left cheek landmark points 
                yanak_sol_arr = [143, 123, 147, 213, 138, 135, 210, 202, 57, 92, 36, 101, 118, 31, 143]
                yanak_sol = polygon(image, yanak_sol_arr, facial_landmarks)

            for facial_landmarks in result.multi_face_landmarks:
                
                #right cheek landmark points 
                yanak_sag_arr = [372, 352, 376, 433, 367, 364, 430, 422, 287, 322, 266, 330, 347, 261, 372]
                yanak_sag = polygon(image, yanak_sag_arr, facial_landmarks)

            for facial_landmarks in result.multi_face_landmarks:

                #forehead landmark points 
                alin_arr = [54, 103, 67, 109, 10, 338, 297, 332, 284, 298, 296, 9, 66, 68,]
                alin = polygon(image, alin_arr, facial_landmarks)

            #extracts ROI areas from the base image
            image = extract_roi(image, yanak_sag , yanak_sol, alin)
            
            #the green channel average of each ROI areas
            mean_yanak_sag = green(image, yanak_sag)[0]
            mean_yanak_sol = green(image, yanak_sol)[0]
            mean_alin = green(image, alin)[0]
            
            #mean green channel value of all ROI areas combined
            mean_raw = (mean_yanak_sag + mean_yanak_sol + mean_alin)/3
            
            #updates green channel array
            #the limit of the array is 10 seconds.
            update_signal(mean_raw, green_arr, n_frames=fps*10)
            
            #if size of green_array channel is greater than 3 seconds of frame size
            #Without the if block, it would try to find a pulse value for an array that was not large enough
            #that would cause an error.
            if len(green_arr) >= fps*3:
                
                #finds pulse from the green channel signal
                nabiz.append(nabiz_bul(green_arr, fps=fps, hz_bottom=1.0, hz_top=2.75))
                
                #putting the last element of the pulse on the frame
                #If a smoother transition between pulse values is desired, 
                #the average of the last 3 seconds pulse values can be putted on the frane.
                cv2.putText(image,"PULSE: " + str(round(nabiz[-1])), (150, 30), font, 1, (100, 255, 0), 3, cv2.LINE_AA)

            # putting the FPS count on the frame
            cv2.putText(image, "FPS: " + str(int(fps)), (7, 30), font, 1, (100, 255, 0), 3, cv2.LINE_AA)
            
            #plotting the green array signal
            if plot_signal:
                if ctr % (fps/2) == 0:
                    ax.cla()
                    ax.plot(green_arr)
                    display(fig)    
                    clear_output(wait = True)

        #if mediapipe cannot detects face it prints "No Face Detected" to the screen
        except:
            text = "No Face Detected"
            coordinates = (128,128)
            fontScale = 1
            color = (255,0,255)
            thickness = 2
            blank = np.zeros(image.shape[:2], dtype="uint8")
            image = cv2.putText(blank, text, coordinates, font, fontScale, color, thickness, cv2.LINE_AA)

        cv2.imshow("roi areas", image)
        
        ctr += 1
        if cv2.waitKey(1) == ord("q"):
            break   

    cap.release()
    cv2.destroyAllWindows()

In [10]:
path = 0
find_pulse(path=path, plot_signal=False)