<a href="https://colab.research.google.com/github/MahmoodAbdali79/Face-ani-spoofing/blob/main/benchmark/evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from keras.preprocessing.image import img_to_array
from rPPG.rPPG_Extracter import *
from rPPG.rPPG_lukas_Extracter import *
from keras.models import model_from_json
import tensorflow as tf
from random import seed
# from random import randint
from random import sample

seed(1)

In [None]:
dim = (128,128)

In [None]:
cascPath = './cv2_model/haarcascade_frontalface_default.xml'
faceCascade = cv2.CascadeClassifier(cascPath)

In [None]:
def extract_frames(path):
  """
  extract frames from videos (each 5 frames) with this order: numberfolder_filemname_x,y,w,h_label.jpg

  parameters:
    path : main folder of videos

  retuen:
    just save given frame
  """
  folders = os.listdir(path)
  os.mkdir(f"{path}/frames")
  
  for folder in folders:
    print(f'exracting foder {folder} ...')
    videos = os.listdir(f'{path}/{folder}')
    for video in videos:
      print(video.split('.')[0], ' ...')
      name = video.split('.')[0]
      if name in ['1','2','HR_1'] : label = '1'
      else : label = '0'
      cap = cv2.VideoCapture(f'{path}/{folder}/{video}')
      
      try:
        i = 0 
        counter = 0
        while True:
          ret, frame = cap.read()
          if not ret: break
          i += 1
          # detect face 
          gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
          faces = faceCascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5)

          if i%5 == 0 and len(faces) != 0:
            x, y, w, h = faces[0]
            print( faces[0],len(faces[0]) , f'counter: {counter+1}')
            cv2.imwrite(f'{path}/frames/{folder}_{name}_{x},{y},{w},{h}_{label}.jpg', frame)
            counter += 1
      finally:
        cap.release()

In [None]:
extract_frames('test_release')

In [None]:
def get_rppg_pred(frame):
    use_classifier = True  
                                 
    sub_roi = []           
    use_resampling = False  
    
    fs = 20

    timestamps = []
    time_start = [0]

    break_ = False

    rPPG_extracter = rPPG_Extracter()
    rPPG_Lukas_Extracter()
    bpm = 0
    
    dt = time.time()-time_start[0]
    time_start[0] = time.time()
    if len(timestamps) == 0:
        timestamps.append(0)
    else:
        timestamps.append(timestamps[-1] + dt)
        

    rPPG_extracter.measure_rPPG(frame,use_classifier,sub_roi) 
    rPPG = np.transpose(rPPG_extracter.rPPG)
    
        # Extract Pulse
    if rPPG.shape[1] > 10:
        if use_resampling :
            t = np.arange(0,timestamps[-1],1/fs)
            
            rPPG_resampled= np.zeros((3,t.shape[0]))
            for col in [0,1,2]:
                rPPG_resampled[col] = np.interp(t,timestamps,rPPG[col])
            rPPG = rPPG_resampled
        num_frames = rPPG.shape[1]

        
    return rPPG

In [None]:
def make_pred(li):
    [single_img,rppg] = li
    single_img = cv2.resize(single_img, dim)
    single_x = img_to_array(single_img)
    single_x = np.expand_dims(single_x, axis=0)
    single_pred = model.predict([single_x,rppg])
    return single_pred

In [None]:
def prediction_test(path):
   """
  predict all extracted frames from videos in  folder test_release/frames

  paramter:
    path : main folder of videos

  return:
    result: predicted classes for all frames
    label: real classes of all frames
  """
  result = []
  label = []
  ma_frames = os.listdir(f'{path}/frames')
  counter = 0

  for i in ma_frames:
    print(f'{counter}:', i)
    im = cv2.imread(f'{path}/frames/{i}')
    if 'HR' in i:
      face = list(map(int, i.split('_')[3].split(',')))
    else:
      face = list(map(int, i.split('_')[2].split(',')))

    x, y, w, h = face
    sub_img=im[y:y+h,x:x+w]
    
    rppg_s = get_rppg_pred(sub_img)
    rppg_s = rppg_s.T
    # print(sub_img.shape)
    # print(rppg_s)
    pred = make_pred([sub_img,rppg_s])
    result.append(pred)
    label.append(int(i.split('_')[-1].split('.')[0]))
    counter += 1

  result = np.array(result).reshape(len(result), -1) 
  result = tf.math.argmin(result, axis=1)
  label = tf.convert_to_tensor(label)
  
  return result, label
  


def PredictSepratedClass(class_folder):
  """
  predict frames from eache class that seprated to each other in benchmark_type folder

  paramter:
    class_folder : single class folder

  return:
    result: predicted classes for all frames
    label: real classes of all frames
  """
  result = []
  label = []
  frames = os.listdir('benchmark_type/'+class_folder)

  for i, frame in enumerate(frames):
    print(f'Counter {i} ... ')
    img = cv2.imread(f'benchmark_type/{class_folder}/{frame}')
    if 'HR' in frame:
      x, y, w, h = list(map(int,frame.split('_')[3].split(',')))
    else:
      x, y, w, h = list(map(int,frame.split('_')[2].split(',')))
    sub_img = img[y:y+h,x:x+w]

    rppg_s = get_rppg_pred(sub_img)
    rppg_s = rppg_s.T
    pred = make_pred([sub_img,rppg_s])
    result.append(pred)
    label.append(int(frame.split('_')[-2]))

  result = np.array(result).reshape(len(result), -1) 
  result = tf.math.argmin(result, axis=1)
  label = tf.convert_to_tensor(label)

  return result, label




In [None]:
# load model
json_file = open('../RGB_rPPG_merge_softmax_.json', 'r')  
loaded_model_json = json_file.read()
json_file.close()
model = model_from_json(loaded_model_json)

#load weights and compile
model.load_weights("../RGB_rPPG_merge_softmax_.h5")
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
result, label = prediction_test('test_release')

In [None]:
#showing confusion matrix of predicts
res = tf.math.confusion_matrix(label,result)
print(res)

In [None]:
# extracte frame from videos and save them in seprated folder bases on classes
def extract_frame(video_paht):  
  folder = video_paht.split('/')[1] 
  name = video_paht.split('.')[0].split('/')[-1]
  label = '0'
  number_frame = 10
  out_folder = '' 

  if name in ['1','2','HR_1'] : 
    label = '1'
    number_frame = 30
    out_folder = 'real'
  elif name in ['3', '4', 'HR_2']:
    out_folder = 'print'
  elif name in ['7', '8', 'HR_4']:
    out_folder = 'reply'
  elif name in ['5', '6', 'HR_3']:
    out_folder = 'print_eye'

  print(f'    Extracting {name}.avi ...')

  cap = cv2.VideoCapture(video_paht)
  length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  selected_frame = np.sort(sample(range(length), number_frame))
  last_frame = selected_frame[-1]

  A = []

  try:
    i = 0 
    while True:
      ret, frame = cap.read()
      if not ret: break

      # detect face
      try:
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = faceCascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5)

        if i in selected_frame and len(faces) != 0:
          A.append(i)
          # print(f'in frame : {i}')
          x, y, w, h = faces[0]
          cv2.imwrite(f'benchmark_type/{out_folder}/{folder}_{name}_{x},{y},{w},{h}_{label}_{i}.jpg', frame)
      
        i += 1
      except Exception as e:
        print(f'can not write frame {i}')
        print(e)
      if i == last_frame+1:
        break

  finally:
    cap.release()


In [None]:
# sending vode_path to extract_frame function
def ExtractFrameBasedOnClass(path):
  folders = os.listdir(path)
  
  for folder in folders:
    print(f'On folder {folder} ...')
    videos = os.listdir(f'{path}/{folder}')
    for video in videos:
      extract_frame(f'{path}/{folder}/{video}')

In [None]:
ExtractFrameBasedOnClass('test_release')

In [None]:
result, label = PredictSepratedClass('reply')

In [None]:
res = tf.math.confusion_matrix(label,result)
print(res)

In [None]:
# Separation of high rezolution from low rezolution

def seprate_HR():
  folders = os.listdir('test_release')
  for folder in folders:
    files = os.listdir(f'test_release/{folder}')
    for file_name in files:

      sub_folder = 'NOHR'
      if file_name in ['HR_1.avi', 'HR_2.avi', 'HR_4.avi', 'HR_3.avi']:
        sub_folder = 'HR'

      label = '0'
      number_frame = 16
      if file_name in ['1.avi', '2.avi', 'HR_1.avi']:
        label = '1'
        number_frame = 31

      cap = cv2.VideoCapture(f'test_release/{folder}/{file_name}')
      frames = []
      faces = []
      i = 1 

      while True:
        ret, frame = cap.read()
        if not ret: break

        i+=1
        try:
          gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
          face = faceCascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5)
          if len(face) != 0:
            faces.append(face[0])
            frames.append(frame)
        except Exception as e:
          print(f'can not write frame {i}')
          print(e)
      
      faces = np.array(faces)
      frames = np.array(frames)

      length = faces.shape[0]
      if length<2:
        continue

      selected_frame = np.sort(sample(range(1, length), min(length, number_frame)-1))
      selected_faces = faces[selected_frame]
      selected_frames = frames[selected_frame]

      for i, f in enumerate(selected_frames):
        x, y, w, h = faces[i]
        cv2.imwrite(f'HR_NOHR/{sub_folder}/{folder}_{file_name.split(".")[0]}_{x},{y},{w},{h}_n{i}_{label}.jpg', f)    # add countre 
        print(f'  {folder}_{file_name.split(".")[0]}_{x},{y},{w},{h}_n{i}_{label}.jpg')
      
      

In [None]:
seprate_HR()

In [None]:
def prediction_HRNOHR_test(path):  
    """
  predict frames seprated in HR/NOHR folder based on High-rezolution/Low-rezolution

  paramter:
    class_folder : single class folder

  return:
    result: predicted classes for all frames
    label: real classes of all frames
  """
  result = []
  label = []
  ma_frames = os.listdir(f'{path}')
  counter = 1
  for i in ma_frames:
    print(f'{counter}:', i)
    im = cv2.imread(f'{path}/{i}')
    if 'HR' in i:
      face = list(map(int, i.split('_')[3].split(',')))
    else:
      face = list(map(int, i.split('_')[2].split(',')))

    x, y, w, h = face
    sub_img = im[y:y+h,x:x+w]
    
    rppg_s = get_rppg_pred(sub_img)
    rppg_s = rppg_s.T
    # print(sub_img.shape)
    # print(rppg_s)
    pred = make_pred([sub_img,rppg_s])
    result.append(pred)
    label.append(int(i.split('_')[-1].split('.')[0]))
    counter += 1

  result = np.array(result).reshape(len(result), -1) 
  result = tf.math.argmin(result, axis=1)
  label = tf.convert_to_tensor(label)

  return result, label

In [None]:
result, label = prediction_HRNOHR_test('HR_NOHR/HR')

In [None]:
res = tf.math.confusion_matrix(label,result)
print(res)