
This jupyter notebook aims to create a formatted machine learning dataset from rPPG video datasets. Then, to notice the effectiveness of a machine learning model training on this dataset. The only treatment performed on the videos is a face extraction.

# Importing Librairies 

In [1]:
#PyVHR Framework
from pyVHR.datasets.ubfc2 import UBFC2
from pyVHR.datasets.dataset import Dataset
from pyVHR.datasets.dataset import datasetFactory
from pyVHR.methods.base import methodFactory
from pyVHR.signals.video import Video


#Tensorflow/KERAS
import tensorflow as tf
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.callbacks import ModelCheckpoint
from tensorflow.python.keras.models import model_from_json
from tensorflow.python.keras.layers import ZeroPadding3D, Dense, Activation,Conv3D,MaxPooling3D,AveragePooling3D,Flatten,Dropout
from tensorflow.python.keras.utils import np_utils
from tensorflow.python.keras.models import model_from_json

# Copy / numpy / OpenCV
from copy import copy
import numpy as np
import cv2

In [2]:
def formating_data_test(video, imgs,start, end, step_x, step_y):
    xtemp = np.zeros(shape=(0, LENGTH_VIDEO, IMAGE_HEIGHT , IMAGE_WIDTH, 1 ))
    # Displacement on the x axis
    iteration_x = 0
    # Our position at n + 1 on the X axis
    axis_x = IMAGE_WIDTH
    
    # width of video
    width = video.cropSize[1]
    # height of video
    height = video.cropSize[0]
    
    # Browse the X axis
    while axis_x < width:
        # Displacement on the y axis
        axis_y = IMAGE_HEIGHT
        # Our position at n + 1 on the Y axis
        iteration_y = 0
        # Browse the Y axis
        while axis_y < height:
            
            # Start position
            x1 = iteration_x * step_x
            y1 = iteration_y * step_y
            
            # End position
            x2 = x1 + IMAGE_WIDTH
            y2 = y1 + IMAGE_HEIGHT
            
            # Cutting 
            face_copy = copy(imgs[start:end,x1:x2,y1:y2,:])
            
            # randomize pixel locations
            for j in range(LENGTH_VIDEO):
                temp = copy(face_copy[j,:,:,:])
                np.random.shuffle(temp)
                face_copy[j] = temp
            
            # Checks the validity of cutting
            if(np.shape(face_copy)[1] == IMAGE_WIDTH and np.shape(face_copy)[2] == IMAGE_HEIGHT):
                # prediction on the cut part
                face_copy = face_copy - np.mean(face_copy)
                xtest = np.expand_dims(face_copy, axis=0)
                xtemp = np.append(xtemp, xtest, axis=0)
                
            
            # increments
            axis_y = y2 + IMAGE_HEIGHT
            iteration_y = iteration_y +1
        # increments    
        axis_x = x2 + IMAGE_WIDTH
        iteration_x = iteration_x + 1
        
    return xtemp

# Protocol for transforming a video into a machine learning dataset

In [3]:
# video config
LENGTH_VIDEO = 150
IMAGE_WIDTH = 25 
IMAGE_HEIGHT = 25 
IMAGE_CHANNELS = 1 
RATE = 30
NB_SECOND = int(LENGTH_VIDEO / RATE)
step_x = 50
step_y = 50
# Available Outputs
HEART_RATES = np.linspace(55, 240, 75)
NB_CLASSES = len(HEART_RATES)

#
# Generate xtest & ytest from one video
#

def extractDataFromVideo(videoFilename, GTFilename):
    
    sigGT = dataset.readSigfile(GTFilename)
    winSizeGT = NB_SECOND
    bpmGT, timesGT = sigGT.getBPM(winSizeGT)
    
    # Format the GT
    bpm = np.round(bpmGT)
    bpm = bpm - 55
    bpm = np.round(bpm / 2.5)
    
    #extraction
    video = Video(videoFilename)
    video.getCroppedFaces(detector='dlib', extractor='skvideo')
    video.setMask(typeROI='skin_adapt',skinThresh_adapt=0.22)

    NB_LAPSE = int(video.numFrames / RATE)

    imgs = np.zeros(shape=(video.numFrames, video.cropSize[0], video.cropSize[1], 1))
    xtest = np.zeros(shape=(0, LENGTH_VIDEO, IMAGE_HEIGHT , IMAGE_WIDTH, 1))
    ytest = np.zeros(shape=(0, NB_CLASSES + 1))

    # prepare labels and label categories
    labels = np.zeros(NB_CLASSES + 1)

    for i in range(NB_CLASSES + 1):
        labels[i] = i
    labels_cat = np_utils.to_categorical(labels)
 
    # channel extraction
    if (video.cropSize[2]<3):
        IMAGE_CHANNELS = 1
    else:
        IMAGE_CHANNELS = video.cropSize[2]

    # load images (imgs contains the whole video)
    for j in range(video.numFrames):

        if (IMAGE_CHANNELS==3):
            temp = video.faces[j]/255
            temp = temp[:,:,1]      # only the G component is currently used
        else:
            temp = video.faces[j] / 255

        imgs[j] = np.expand_dims(temp, 2)
    

    # Construction of sequences for each time interval
    for lapse in range(0,NB_LAPSE):  
    
        start = lapse * RATE
        end = start + LENGTH_VIDEO
        if(end > video.numFrames):
            break
        
        xtemp = formating_data_test(video, imgs,start, end, step_x, step_y)
        
        #Sequence  
        xtest = np.append(xtest, xtemp, axis=0)
        #GT
        gt = np.expand_dims(labels_cat[int(bpm[lapse+int(NB_SECOND/2)])], axis=0)
        
        for i in range(np.shape(xtemp)[0]):
            ytest = np.append(ytest, gt, axis=0)
        
    return xtest, ytest


# Applying the transformation on UBFC2

In [None]:
dataset = datasetFactory("UBFC2")

xtrain = np.array(np.zeros(shape=(0,LENGTH_VIDEO, IMAGE_HEIGHT, IMAGE_WIDTH, 1)))
ytrain = np.zeros(shape=(0, NB_CLASSES + 1))

blacklist = [0, 15, 18, 21]

# For each video in the dataset
for i in range (len(dataset.videoFilenames)):
    print ("video :" + str(i))
    if(i not in blacklist):
        xtest, ytest = extractDataFromVideo(dataset.videoFilenames[i], dataset.sigFilenames[i])
        xtrain = np.concatenate((xtrain, xtest), axis=0)
        ytrain = np.concatenate((ytrain, ytest), axis=0)

# Mix the sequences
indices = np.arange(xtrain.shape[0])
np.random.shuffle(indices)
xtrain = xtrain[indices]
ytrain = ytrain[indices]

# save
np.savez('./dataUBFC2.npz', a=xtrain, b=ytrain)
print(np.shape(xtrain))
print(np.shape(ytrain))








# Division into 1 test dataset and 1 validation dataset

In [None]:
data = np.load('./dataUBFC2.npz')
# 90% -> test & 10% -> validation
pct = 0.9
sizeDataset = data['a'].shape[0]
sizeTrainData = int(sizeDataset * pct) 

xtrain = data['a'][:sizeTrainData,:]
xvalidation = data['a'][sizeTrainData:,:]

ytrain = data['b'][:sizeTrainData,:]
yvalidation = data['b'][sizeTrainData:,:]

np.savez('./dataSplitedUBFC2.npz', a=xtrain, b=ytrain, c=xvalidation, d=yvalidation)

print(np.shape(xtrain))
print(np.shape(ytrain))
print(np.shape(xvalidation))
print(np.shape(yvalidation))


In [None]:
print(xtrain)

# Testing datasets on a model training

In [5]:
# DEFINE MODEL
model = Sequential()

#feature extraction
model.add(Conv3D(filters=32, kernel_size=(LENGTH_VIDEO-2,IMAGE_HEIGHT-5,IMAGE_WIDTH-5), input_shape=(LENGTH_VIDEO, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS)))
model.add(MaxPooling3D(pool_size=(2,2,2)))
model.add(Activation('relu'))
model.add(Dropout(0.2))

model.add(Flatten())

#Classification
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(NB_CLASSES + 1, activation='softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

model.fit(xtrain, ytrain, epochs = 20, batch_size=16, verbose=1)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<tensorflow.python.keras.callbacks.History at 0x1fb2daba780>

The model can learn from this data despite its small number. Indeed, we can see that the accuracy is higher than 7%, so the model is more efficient than chance. Therefore, it makes sense to enrich our synthetic data set with this data set to hopefully improve the results.

In [31]:
xtest =np.zeros(shape=(1, 2, 2))
xtest2 = np.zeros(shape=(2, 2))
xtest3 = np.expand_dims(xtest2, axis=0)
np.shape(xtest3)
xtest = np.append(xtest, xtest3, axis=0)
print(np.shape(xtest))

(2, 2, 2)
