In [1]:
import os
import time
import numpy as np
import pandas as pd
import cv2
from deepface import DeepFace
from deepface.commons import functions
import face_recognition as fr
from tensorflow.keras.models import load_model

# dependency configuration
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
os.environ['KMP_DUPLICATE_LIB_OK']='True'

import docdetect
from paddleocr import PaddleOCR,draw_ocr

In [2]:
def get_boundingbox(x1,y1,x2,y2, width, height, scale=1.3, minsize=None):
    """
    based on get_boundingbox method of Andreas Rössler
    Expects a dlib face to generate a quadratic bounding box.
    :param x1 - coordnate from face_recognition get_location 
    :param y1 - coordnate from face_recognition get_location
    :param x2 - coordnate from face_recognition get_location
    :param y2 - coordnate from face_recognition get_location
    :param width: frame width
    :param height: frame height
    :param scale: bounding box size multiplier to get a bigger face region
    :param minsize: set minimum bounding box size
    :return: x, y, bounding_box_size in opencv form
    """

    size_bb = int(max(x2 - x1, y2 - y1) * scale)
    if minsize:
        if size_bb < minsize:
            size_bb = minsize
    center_x, center_y = (x1 + x2) // 2, (y1 + y2) // 2

    # Check for out of bounds, x-y top left corner
    x1 = max(int(center_x - size_bb // 2), 0)
    y1 = max(int(center_y - size_bb // 2), 0)
    # Check for too big bb size for given x, y
    size_bb = min(width - x1, size_bb)
    size_bb = min(height - y1, size_bb)

    return x1, y1, size_bb

In [3]:
def locate_faces(img,face_left_frames, width, height):
    try:
       
        detected_faces = []
        face_objs = fr.face_locations(img)            
        #locate the faces    
        face_detected = False
        for face in face_objs:
            top, right, bottom, left = face           
            x,y,size_bb = get_boundingbox(left,top,right,bottom,width,height)
            w=h=size_bb                         
            if w > 50:  # discard small detected faces
                face_detected = True                
                cv2.rectangle(img, (x, y), (x + w, y + h), (67, 67, 67), 1)  # draw rectangle to main image
                cv2.putText(
                    img,
                    str(face_left_frames),
                    (int(x + w / 4), int(y + h / 1.5)),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    4,
                    (255, 255, 255),
                    2,
                )                
                detected_faces.append((x, y, w, h))
    except  Exception as e:  # to avoid exception if no face detected               
        detected_faces = []
    return detected_faces             

In [4]:
a=list(filter(lambda x: x>0.6,[0.1,1,0.4]))

In [5]:
#embeddings = [x for x in df['embedding']]

In [6]:
#embedding = embeddings[0]

In [7]:
#dist= fr.face_distance(embeddings,embedding)

In [8]:
#dist

In [9]:
def get_closest_row(embedding, df, field):

    # Get the cosine similarity
   # cos_sim = cosine_similarity(np.array(embedding)[None,:], df2["embedding"])
   # print(cos_sim)
    #dist = np.linalg.norm(list(df2["embedding"]) - np.array(embedding), axis=1)
    embeddings = [x for x in df[field]]
    #embeddings=df['embedding']
    #print(embeddings[0])
    #print(embedding)
    dist= fr.face_distance(embeddings,embedding)
    
    print("dist",dist)
    filtered_dist = list(filter(lambda x: x<0.6,dist))
    if filtered_dist==[]:
        return None

    # Get the index of the maximum value in the cosine similarity
    index = np.argmin(dist)   
    # Get the row from df1 with the maximum cosine similarity
    row = df.iloc[index]  
    # Return the row
    return row

In [10]:
def recognize(img,detected_face,df):

    
    frameHeight=img.shape[0]
    frameWidth=img.shape[1]
   
    
    x = detected_face[0]
    y = detected_face[1]
    w = detected_face[2]
    h = detected_face[3]
    custom_face = img[y : y + h, x : x + w]
    embedding = fr.face_encodings(custom_face)
    
    
        
        #cv2.rectangle(img, (x,y), (x+w,y+h), (0,0,255), int(round(frameHeight/150)), 8)
        #embedding = DeepFace.represent("data/me_fear.png", model_name="Dlib")
     
    row = get_closest_row(embedding[0],df, "encoding")
    #text = ''#row["name"]+", "+row["path"]
    #cv2.putText(img, f'{text}', (x, y-10), 
    #                cv2.FONT_HERSHEY_SIMPLEX, 0.0006*img.shape[1],
    #                (255,255,255), int(0.002*img.shape[1]), cv2.LINE_AA)        
        
        #plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))   
    return row

In [59]:
def draw_summary_box(freeze_img, content, x,y,w,h,resolution_x,resolution_y):
    # background of mood box
    pivot_img_size=112
    # transparency
    overlay = freeze_img.copy()
    opacity = 0.2
    text_box_width = 200
    text_box_height = max(h,len(content)*20+20)   
   
    
    cx1 =   x - text_box_width
    cx2= x
        
    if x - text_box_width > 0:
        cx1 =   x - text_box_width
        cx2= x
        # left
       
    elif x + w + text_box_width < resolution_x:
    # right
        cx1 =  x + w
        cx2=   x + w + text_box_width                                       
       
    
    cv2.rectangle(freeze_img,(cx1, y),(cx2, y + h),(64, 64, 64), cv2.FILLED, )
    cv2.addWeighted(overlay, opacity, freeze_img, 1 - opacity, 0, freeze_img)
                                                  
    index=0
    
    for key,val in content.items():
        
        text_location_y = y + 20 + (index ) * 20
        text_location_x = cx1

        if text_location_y < y + text_box_height:
            text = f"{key}: {val}"                                      
            cv2.putText(freeze_img,text,(text_location_x, text_location_y),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5,(255, 255, 255), 1,)
        index+=1    

  
                       
                                

In [12]:
def do_pdocr(freeze_img):   
    result = ocr.ocr(freeze_img,cls=False)  
    
    txts = None if result==[] else [line[1][0] for line in result[0]]  
    
    scores = None if result==[] else [line[1][1] for line in result[0]] 
    print(txts,scores)
    return txts, scores

In [13]:
def prepare_models_for_analysis(wiki=True):

    ageProto_wild="models/wild/age_deploy.prototxt"
    ageModel_wild="models/wild/age_net.caffemodel"
    genderProto_wild="models/wild/gender_deploy.prototxt"
    genderModel_wild="models/wild/gender_net.caffemodel"
    emoProto ="models/wild/deploy.prototxt" 
    emoModel="models/wild/EmotiW_VGG_S.caffemodel"
    
    ageProto_wiki="models/wiki/age.prototxt"
    ageModel_wiki="models/wiki/age.caffemodel"
    genderProto_wiki="models/wiki/gender.prototxt"
    genderModel_wiki="models/wiki/gender.caffemodel"

    #confidence threshold for face detection
    
    if wiki:
        ageNet=cv2.dnn.readNet(ageModel_wiki,ageProto_wiki)
        genderNet=cv2.dnn.readNet(genderModel_wiki,genderProto_wiki)   
    else:
        print('wild')
        ageNet=cv2.dnn.readNet(ageModel_wild,ageProto_wild)
        genderNet=cv2.dnn.readNet(genderModel_wild,genderProto_wild)
    emoNet =cv2.dnn.readNet(emoModel,emoProto,)
    return  ageNet,genderNet,emoNet

In [14]:
def detectAgeGender_wild(img,detected_face, ageNet, genderNet):
    
    
    MODEL_MEAN_VALUES=(78.4263377603, 87.7689143744, 114.895847746)
    
    ageList=['(0-2)', '(4-6)', '(8-12)', '(15-20)', '(25-32)', '(38-43)', '(48-53)', '(60-100)']
    genderList=['M','W']
    

    frameHeight=img.shape[0]
    frameWidth=img.shape[1]
   
    
    x = detected_face[0]
    y = detected_face[1]
    w = detected_face[2]
    h = detected_face[3]
    custom_face = img[y : y + h, x : x + w]

    blob=cv2.dnn.blobFromImage(custom_face, 1.0, (227,227), MODEL_MEAN_VALUES, swapRB=False)
    genderNet.setInput(blob)
    genderPreds=genderNet.forward()
    gender=genderList[genderPreds[0].argmax()]       
        
        

    
    blob=cv2.dnn.blobFromImage(custom_face, 1.0, (227,227), MODEL_MEAN_VALUES, swapRB=False, crop=False)   
    ageNet.setInput(blob)
    agePreds=ageNet.forward()
    age=ageList[agePreds[0].argmax()]
    
    return {'age': age, 'gender':gender}
        
    

In [15]:
objs = DeepFace.analyze(img_path = "data/db/leia_organa.jpeg", detector_backend='dlib',
        actions = ['age', 'gender', 'race', 'emotion']
)

Action: emotion: 100%|███████████████████████████████████████████████████████████████████| 4/4 [00:04<00:00,  1.01s/it]


In [16]:
objs

[{'age': 36,
  'region': {'x': 545, 'y': 133, 'w': 155, 'h': 155},
  'gender': {'Woman': 99.67048764228821, 'Man': 0.32951917964965105},
  'dominant_gender': 'Woman',
  'race': {'asian': 1.9868701696395874,
   'indian': 3.2996483147144318,
   'black': 0.405914057046175,
   'white': 47.79939353466034,
   'middle eastern': 21.546782553195953,
   'latino hispanic': 24.96139258146286},
  'dominant_race': 'white',
  'emotion': {'angry': 2.029234729707241,
   'disgust': 0.0002847034238584456,
   'fear': 0.0029691034796996973,
   'happy': 84.71994400024414,
   'sad': 0.10489042615517974,
   'surprise': 0.010533053864492103,
   'neutral': 13.132140040397644},
  'dominant_emotion': 'happy'}]

In [17]:
def detectAgeGender_wiki(img,detected_face, ageNet, genderNet):

    MODEL_MEAN_VALUES=(78.4263377603, 87.7689143744, 114.895847746)
    
    frameHeight=img.shape[0]
    frameWidth=img.shape[1]
   
    
    x = detected_face[0]
    y = detected_face[1]
    w = detected_face[2]
    h = detected_face[3]
    custom_face = img[y : y + h, x : x + w]

    
    blob=cv2.dnn.blobFromImage(custom_face, 1.0, (224,224), MODEL_MEAN_VALUES, swapRB=False)        
    genderNet.setInput(blob)
    genderPreds=genderNet.forward()    
    gender='W' if np.argmax(genderPreds) == 0 else 'M'      
       
    
    
    blob=cv2.dnn.blobFromImage(custom_face, 1.0, (224,224), MODEL_MEAN_VALUES, swapRB=False)
    ageNet.setInput(blob)
    agePreds=ageNet.forward()    
    output_indexes = np.array([i for i in range(0, 101)])
    age = int(np.sum(agePreds * output_indexes))      
        
    
    return {'age': age, 'gender':gender}
        

In [18]:
def detectEmotion(img,detected_face, emoNet):
    
    conf_threshold=0.7

    MODEL_MEAN_VALUES=(78.4263377603, 87.7689143744, 114.895847746)
    
    #ageList=['(0-2)', '(4-6)', '(8-12)', '(15-20)', '(25-32)', '(38-43)', '(48-53)', '(60-100)']
    #genderList=['M','W']
    emoList = [ 'Angry' , 'Disgust' , 'Fear' , 'Happy'  , 'Neutral' ,  'Sad' , 'Surprise']
    

    frameHeight=img.shape[0]
    frameWidth=img.shape[1]
   
    
    x = detected_face[0]
    y = detected_face[1]
    w = detected_face[2]
    h = detected_face[3]
    custom_face = img[y : y + h, x : x + w]

    #blob=cv2.dnn.blobFromImage(custom_face, 1.0, (227,227), MODEL_MEAN_VALUES, swapRB=False)
    blob=cv2.dnn.blobFromImage(custom_face, 1.0, (224,224), MODEL_MEAN_VALUES, swapRB=False)
        
    emoNet.setInput(blob)
    emoPreds=emoNet.forward()
    emo=emoList[emoPreds[0].argmax()]
    #gender=genderList[genderPreds[0].argmax()]       
            #print(f'Gender: {gender}')
        
        
    
        
        
    #cv2.putText(img, f'{gender}, {age}', (x, y-10), 
    #            cv2.FONT_HERSHEY_SIMPLEX, 0.0009*frameWidth, (255,0,255), int(0.004*frameWidth), cv2.LINE_AA)        
    return {'emotion': emo}
        

### Xception

In [19]:
XCEPTION_MODEL = './xception-b5690688.pth'

In [20]:
## xception.py
"""
Ported to pytorch thanks to [tstandley](https://github.com/tstandley/Xception-PyTorch)
@author: tstandley
Adapted by cadene
Creates an Xception Model as defined in:
Francois Chollet
Xception: Deep Learning with Depthwise Separable Convolutions
https://arxiv.org/pdf/1610.02357.pdf
This weights ported from the Keras implementation. Achieves the following performance on the validation set:
Loss:0.9173 Prec@1:78.892 Prec@5:94.292
REMEMBER to set your image size to 3x299x299 for both test and validation
normalize = transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                  std=[0.5, 0.5, 0.5])
The resize parameter of the validation transform should be 333, and make sure to center crop at 299x299
"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.model_zoo as model_zoo
from torch.nn import init

pretrained_settings = {
    'xception': {
        'imagenet': {
            'url': 'http://data.lip6.fr/cadene/pretrainedmodels/xception-b5690688.pth',
            'input_space': 'RGB',
            'input_size': [3, 299, 299],
            'input_range': [0, 1],
            'mean': [0.5, 0.5, 0.5],
            'std': [0.5, 0.5, 0.5],
            'num_classes': 1000,
            'scale': 0.8975 # The resize parameter of the validation transform should be 333, and make sure to center crop at 299x299
        }
    }
}


class SeparableConv2d(nn.Module):
    def __init__(self,in_channels,out_channels,kernel_size=1,stride=1,padding=0,dilation=1,bias=False):
        super(SeparableConv2d,self).__init__()

        self.conv1 = nn.Conv2d(in_channels,in_channels,kernel_size,stride,padding,dilation,groups=in_channels,bias=bias)
        self.pointwise = nn.Conv2d(in_channels,out_channels,1,1,0,1,1,bias=bias)

    def forward(self,x):
        x = self.conv1(x)
        x = self.pointwise(x)
        return x


class Block(nn.Module):
    def __init__(self,in_filters,out_filters,reps,strides=1,start_with_relu=True,grow_first=True):
        super(Block, self).__init__()

        if out_filters != in_filters or strides!=1:
            self.skip = nn.Conv2d(in_filters,out_filters,1,stride=strides, bias=False)
            self.skipbn = nn.BatchNorm2d(out_filters)
        else:
            self.skip=None

        self.relu = nn.ReLU(inplace=True)
        rep=[]

        filters=in_filters
        if grow_first:
            rep.append(self.relu)
            rep.append(SeparableConv2d(in_filters,out_filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(out_filters))
            filters = out_filters

        for i in range(reps-1):
            rep.append(self.relu)
            rep.append(SeparableConv2d(filters,filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(filters))

        if not grow_first:
            rep.append(self.relu)
            rep.append(SeparableConv2d(in_filters,out_filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(out_filters))

        if not start_with_relu:
            rep = rep[1:]
        else:
            rep[0] = nn.ReLU(inplace=False)

        if strides != 1:
            rep.append(nn.MaxPool2d(3,strides,1))
        self.rep = nn.Sequential(*rep)

    def forward(self,inp):
        x = self.rep(inp)

        if self.skip is not None:
            skip = self.skip(inp)
            skip = self.skipbn(skip)
        else:
            skip = inp

        x+=skip
        return x


class Xception(nn.Module):
    """
    Xception optimized for the ImageNet dataset, as specified in
    https://arxiv.org/pdf/1610.02357.pdf
    """
    def __init__(self, num_classes=1000):
        """ Constructor
        Args:
            num_classes: number of classes
        """
        super(Xception, self).__init__()
        self.num_classes = num_classes

        self.conv1 = nn.Conv2d(3, 32, 3,2, 0, bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(32,64,3,bias=False)
        self.bn2 = nn.BatchNorm2d(64)
        #do relu here

        self.block1=Block(64,128,2,2,start_with_relu=False,grow_first=True)
        self.block2=Block(128,256,2,2,start_with_relu=True,grow_first=True)
        self.block3=Block(256,728,2,2,start_with_relu=True,grow_first=True)

        self.block4=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block5=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block6=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block7=Block(728,728,3,1,start_with_relu=True,grow_first=True)

        self.block8=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block9=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block10=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block11=Block(728,728,3,1,start_with_relu=True,grow_first=True)

        self.block12=Block(728,1024,2,2,start_with_relu=True,grow_first=False)

        self.conv3 = SeparableConv2d(1024,1536,3,1,1)
        self.bn3 = nn.BatchNorm2d(1536)

        #do relu here
        self.conv4 = SeparableConv2d(1536,2048,3,1,1)
        self.bn4 = nn.BatchNorm2d(2048)

        self.fc = nn.Linear(2048, num_classes)

        # #------- init weights --------
        # for m in self.modules():
        #     if isinstance(m, nn.Conv2d):
        #         n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
        #         m.weight.data.normal_(0, math.sqrt(2. / n))
        #     elif isinstance(m, nn.BatchNorm2d):
        #         m.weight.data.fill_(1)
        #         m.bias.data.zero_()
        # #-----------------------------

    def features(self, input):
        x = self.conv1(input)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        x = self.block7(x)
        x = self.block8(x)
        x = self.block9(x)
        x = self.block10(x)
        x = self.block11(x)
        x = self.block12(x)

        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)

        x = self.conv4(x)
        x = self.bn4(x)
        return x

    def logits(self, features):
        x = self.relu(features)

        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = x.view(x.size(0), -1)
        x = self.last_linear(x)
        return x

    def forward(self, input):
        x = self.features(input)
        x = self.logits(x)
        return x


def xception(num_classes=1000, pretrained='imagenet'):
    model = Xception(num_classes=num_classes)
    if pretrained:
        settings = pretrained_settings['xception'][pretrained]
        assert num_classes == settings['num_classes'], \
            "num_classes should be {}, but is {}".format(settings['num_classes'], num_classes)

        model = Xception(num_classes=num_classes)
        model.load_state_dict(model_zoo.load_url(settings['url']))

        model.input_space = settings['input_space']
        model.input_size = settings['input_size']
        model.input_range = settings['input_range']
        model.mean = settings['mean']
        model.std = settings['std']

    # TODO: ugly
    model.last_linear = model.fc
    del model.fc
    return model


In [21]:
## models.py
"""
Author: Andreas Rössler
"""
import os
import argparse


import torch
# import pretrainedmodels
import torch.nn as nn
import torch.nn.functional as F
# from network.xception import xception
import math
import torchvision


def return_pytorch04_xception(pretrained=True):
    # Raises warning "src not broadcastable to dst" but thats fine
    model = xception(pretrained=False)
    if pretrained:
        # Load model in torch 0.4+
        model.fc = model.last_linear
        del model.last_linear
        state_dict = torch.load(
            #'/home/ondyari/.torch/models/xception-b5690688.pth')
            XCEPTION_MODEL)
        for name, weights in state_dict.items():
            if 'pointwise' in name:
                state_dict[name] = weights.unsqueeze(-1).unsqueeze(-1)
        model.load_state_dict(state_dict)
        model.last_linear = model.fc
        del model.fc
    return model


class TransferModel(nn.Module):
    """
    Simple transfer learning model that takes an imagenet pretrained model with
    a fc layer as base model and retrains a new fc layer for num_out_classes
    """
    def __init__(self, modelchoice, num_out_classes=2, dropout=0.0):
        super(TransferModel, self).__init__()
        self.modelchoice = modelchoice
        if modelchoice == 'xception':
            self.model = return_pytorch04_xception()
            # Replace fc
            num_ftrs = self.model.last_linear.in_features
            if not dropout:
                self.model.last_linear = nn.Linear(num_ftrs, num_out_classes)
            else:
                print('Using dropout', dropout)
                self.model.last_linear = nn.Sequential(
                    nn.Dropout(p=dropout),
                    nn.Linear(num_ftrs, num_out_classes)
                )
        elif modelchoice == 'resnet50' or modelchoice == 'resnet18':
            if modelchoice == 'resnet50':
                self.model = torchvision.models.resnet50(pretrained=True)
            if modelchoice == 'resnet18':
                self.model = torchvision.models.resnet18(pretrained=True)
            # Replace fc
            num_ftrs = self.model.fc.in_features
            if not dropout:
                self.model.fc = nn.Linear(num_ftrs, num_out_classes)
            else:
                self.model.fc = nn.Sequential(
                    nn.Dropout(p=dropout),
                    nn.Linear(num_ftrs, num_out_classes)
                )
        else:
            raise Exception('Choose valid model, e.g. resnet50')

    def set_trainable_up_to(self, boolean, layername="Conv2d_4a_3x3"):
        """
        Freezes all layers below a specific layer and sets the following layers
        to true if boolean else only the fully connected final layer
        :param boolean:
        :param layername: depends on network, for inception e.g. Conv2d_4a_3x3
        :return:
        """
        # Stage-1: freeze all the layers
        if layername is None:
            for i, param in self.model.named_parameters():
                param.requires_grad = True
                return
        else:
            for i, param in self.model.named_parameters():
                param.requires_grad = False
        if boolean:
            # Make all layers following the layername layer trainable
            ct = []
            found = False
            for name, child in self.model.named_children():
                if layername in ct:
                    found = True
                    for params in child.parameters():
                        params.requires_grad = True
                ct.append(name)
            if not found:
                raise Exception('Layer not found, cant finetune!'.format(
                    layername))
        else:
            if self.modelchoice == 'xception':
                # Make fc trainable
                for param in self.model.last_linear.parameters():
                    param.requires_grad = True

            else:
                # Make fc trainable
                for param in self.model.fc.parameters():
                    param.requires_grad = True

    def forward(self, x):
        x = self.model(x)
        return x


def model_selection(modelname, num_out_classes,
                    dropout=None):
    """
    :param modelname:
    :return: model, image size, pretraining<yes/no>, input_list
    """
    if modelname == 'xception':
        return TransferModel(modelchoice='xception',
                             num_out_classes=num_out_classes), 299, \
               True, ['image'], None
    elif modelname == 'resnet18':
        return TransferModel(modelchoice='resnet18', dropout=dropout,
                             num_out_classes=num_out_classes), \
               224, True, ['image'], None
    else:
        raise NotImplementedError(modelname)

# if __name__ == '__main__':
#     model, image_size, *_ = model_selection('resnet18', num_out_classes=2)
#     print(model)
#     model = model.cuda()
#     from torchsummary import summary
#     input_s = (3, image_size, image_size)
#     print(summary(model, input_s))


In [22]:
## transform.py
"""
Author: Andreas Rössler


"""
from torchvision import transforms

xception_default_data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
        transforms.Normalize([0.5]*3, [0.5]*3)
    ]),
    'val': transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
        transforms.Normalize([0.5] * 3, [0.5] * 3)
    ]),
    'test': transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
        transforms.Normalize([0.5] * 3, [0.5] * 3)
    ]),
}

In [23]:
## detect_from_video.py
"""
Evaluates a folder of video files or a single file with a xception binary
classification network.
Usage:
python detect_from_video.py
    -i <folder with video files or path to video file>
    -m <path to model file>
    -o <path to output folder, will write one or multiple output videos there>
Author: Andreas Rössler
"""
import os
import argparse
from os.path import join
import cv2
import dlib
import torch
import torch.nn as nn
from PIL import Image as pil_image
from tqdm.notebook import tqdm


def preprocess_image(image, cuda=True):
    """
    Preprocesses the image such that it can be fed into our network.
    During this process we envoke PIL to cast it into a PIL image.
    :param image: numpy image in opencv form (i.e., BGR and of shape
    :return: pytorch tensor of shape [1, 3, image_size, image_size], not
    necessarily casted to cuda
    """
    # Revert from BGR
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    # Preprocess using the preprocessing function used during training and
    # casting it to PIL image
    preprocess = xception_default_data_transforms['test']
    preprocessed_image = preprocess(pil_image.fromarray(image))
    # Add first dimension as the network expects a batch
    preprocessed_image = preprocessed_image.unsqueeze(0)
    if cuda:
        preprocessed_image = preprocessed_image.cuda()
    return preprocessed_image


def predict_with_model(image, model, post_function=nn.Softmax(dim=1),
                       cuda=True):
    """
    Predicts the label of an input image. Preprocesses the input image and
    casts it to cuda if required
    :param image: numpy image
    :param model: torch model with linear layer at the end
    :param post_function: e.g., softmax
    :param cuda: enables cuda, must be the same parameter as the model
    :return: prediction (1 = fake, 0 = real)
    """
    # Preprocess
    preprocessed_image = preprocess_image(image, cuda)

    # Model prediction
    output = model(preprocessed_image)
    output = post_function(output)

    # Cast to desired
    _, prediction = torch.max(output, 1)    # argmax
    prediction = float(prediction.cpu().numpy())

    return int(prediction), output



In [24]:
def image_frame_pred(img, detected_face,model,cuda=True):
    """
    Predict and give result as numpy array
    """
    frameHeight=img.shape[0]
    frameWidth=img.shape[1]
   
    
    x = detected_face[0]
    y = detected_face[1]
    w = detected_face[2]
    h = detected_face[3]
    custom_face = img[y : y + h, x : x + w]
   
    
    prediction = 0
    output = []   
    # Actual prediction using our model
    prediction, output = predict_with_model(custom_face, model,
                                                        cuda=cuda)
    # ------------------------------------------------------------------
        
    return prediction, output

In [25]:
model_path_full_c23 = '../deepfake/faceforensics_models/faceforensics++_models_subset/full/xception/full_c23.p'
model_full_c23 = torch.load(model_path_full_c23, map_location=torch.device('cpu'))

In [26]:
model_path_full_c40 = '../deepfake/faceforensics_models/faceforensics++_models_subset/full/xception/full_c40.p'
model_full_c40 = torch.load(model_path_full_c40, map_location=torch.device('cpu'))

In [27]:
def is_smile(landmarks):    
    # Calculate the lips width    
    left_lip_edge = np.array(landmarks['top_lip'][0])
    right_lip_edge = np.array(landmarks['top_lip'][6])
    lips_width = np.abs(left_lip_edge[0]-right_lip_edge[0])    
    face_width = np.abs(landmarks['chin'][2][0] - landmarks['chin'][14][0])   
    print(lips_width,face_width,lips_width/face_width)
    return  lips_width/face_width >=0.45   

In [28]:
def fr_smile_detect(img,detected_face,landmarks):        
    frameHeight=img.shape[0]
    frameWidth=img.shape[1]
   
    
    x = detected_face[0]
    y = detected_face[1]
    w = detected_face[2]
    h = detected_face[3]
    custom_face = img[y : y + h, x : x + w]
           
    smiled = is_smile(landmarks)              
        
    return smiled

In [29]:
lmodel = load_model('../liveness/data/left_eye.h5')
rmodel = load_model('../liveness/data/right_eye.h5')

In [30]:
def get_eyes(frame, landmarks):
    x1,y1 = landmarks['left_eye'][0][0], landmarks['left_eye'][1][1]
    x2,y2 = landmarks['left_eye'][3][0], landmarks['left_eye'][5][1]
    shape1 = [(x1-5,y1-5), (x2+5,y2+5)]
        
    x11,y11 = landmarks['right_eye'][0][0], landmarks['right_eye'][1][1]
    x21,y21 = landmarks['right_eye'][3][0], landmarks['right_eye'][5][1]
    shape2 = [(x11-5,y11-5), (x21+5,y21+5)]
        
    left_eye = frame[shape1[0][1]:shape1[1][1],shape1[0][0]:shape1[1][0],:]
    right_eye = frame[shape2[0][1]:shape2[1][1],shape2[0][0]:shape2[1][0],:]
    return (left_eye, right_eye)

In [31]:
def fr_dnn_eye_roll_detect(img,detected_face,landmarks):        
    frameHeight=img.shape[0]
    frameWidth=img.shape[1]
   
    
    x = detected_face[0]
    y = detected_face[1]
    w = detected_face[2]
    h = detected_face[3]
    custom_face = img[y : y + h, x : x + w]     
    left_eye, right_eye = get_eyes(img,landmarks)
    l_img =  cv2.resize(left_eye, (30,20), interpolation = cv2.INTER_AREA)        
    x = np.expand_dims(l_img, axis=0)
    images = np.vstack([x])       
    l_prediction = lmodel.predict(images, batch_size=10)    
        
    r_img =  cv2.resize(right_eye, (30,20), interpolation = cv2.INTER_AREA)        
    x = np.expand_dims(r_img, axis=0)
    images = np.vstack([x])       
    r_prediction = rmodel.predict(images, batch_size=10)       
                
    eyes_rolled_flag =  l_prediction[0] >=0.5 and r_prediction[0] >=0.5
    return eyes_rolled_flag
        

In [32]:
db1 = {"id": [1,2,3],
      "name": ["Elina Maliarsky", "Leia Organa",'Elina Maliarsky'],
      "path" : ["data/db/me_fear.png","data/db/leia_organa.jpeg","data/db/me_happy.png"] 
      
     }

df1=pd.DataFrame(db1)

df1["encoding"] = df1.apply(lambda r: DeepFace.represent(r["path"], model_name='Dlib')[0]['embedding'] ,axis=1)

db2 = {"id": [1],
      "name": [ "Leia Organa"],
      "path" : ["data/db/leia_organa.jpeg"] 
      
     }

df2=pd.DataFrame(db2)

df2["encoding"]=df2.apply(lambda r: DeepFace.represent(r["path"], model_name='Dlib')[0]['embedding'] ,axis=1)

In [33]:
df1

Unnamed: 0,id,name,path,encoding
0,1,Elina Maliarsky,data/db/me_fear.png,"[-0.087116539478302, 0.14015552401542664, 0.06..."
1,2,Leia Organa,data/db/leia_organa.jpeg,"[-0.03101312555372715, -0.0030283723026514053,..."
2,3,Elina Maliarsky,data/db/me_happy.png,"[-0.10627612471580505, 0.1906319409608841, 0.0..."


In [34]:
#print(embeddings)

In [35]:
#embeddings[0][0]['embedding']

In [36]:
db_verification = {"id": ["12345"],
      "first_name": ["ELINA"],
      "last_name" : ["MALIARSKY"]            
     }

df_verification=pd.DataFrame(db_verification)

In [37]:
df_verification[df_verification['first_name'].str.casefold()!='elina'.casefold()]

Unnamed: 0,id,first_name,last_name


In [38]:
def verify_identity(df,ID,first_name, last_name):
    row = df[(df['id'].str.lower() ==ID.lower()) & 
             ((df['first_name'].str.lower() == first_name.lower())
              | (df['last_name'].str.lower() == last_name.lower())) ]
    
    return len(row)>0
    

In [61]:

tic = time.time()
cap = cv2.VideoCapture(2)
freeze = False
face_detected = False
face_included_frames = 0  # freeze screen if face detected sequantially 5 frames
freezed_frame = 0
time_threshold=5
frame_threshold = 5
flag_detect= False
flag_recognize = False
flag_match=False
flag_liveness = False
flag_deepfake = False
flag_analyze = False
df_rec = df1.copy()
fourcc = cv2.VideoWriter_fourcc(*'XVID')
#cv2.namedWindow('img',cv2.WINDOW_NORMAL)
#cv2.resizeWindow('img', 775,600)
out = cv2.VideoWriter('demo_kyc4.avi', fourcc, 20.0, (640,  480))
pivot_img_size = 112  # face recognition result image

ageNet,genderNet,emoNet = prepare_models_for_analysis(True)


# need to run only once to download and load model into memory
ocr = PaddleOCR(lang='en')
frame_id=0
df_detect=pd.DataFrame(columns = ["frame_id", "age","gender","deepfake","encoding", "name"])
while True:
    _, img = cap.read()
    if img is None:
        break
    raw_img = img.copy()
    resolution_x = img.shape[1]
    resolution_y = img.shape[0]
    
    key=cv2.waitKey(1)
    
    #if cv2.waitKey(1) & 0xFF == ord("a"):  # press q to quit
    #detecting unknown face
    if key==ord("d"):
        flag_detect=True 
        flag_analyze=True
        flag_deepfake=True
    #detecting known_face    
    if key==ord("r"):
        flag_detect=True 
        flag_recognize=True
        flag_deepfake=True    
    if key==ord("m"):
        flag_match=True            
    if key==ord("l"):
        flag_liveness=True
        
    
    
    #if flags for detection/recognition/liveness are on
    if flag_detect or flag_match or flag_liveness:  
        detected_faces = []
        if freeze == False:
            
            frames_left = frame_threshold - face_included_frames-1
            detected_faces =  locate_faces(img,frames_left, resolution_x, resolution_y)            
            if len(detected_faces)>0:
                face_detected=True
                face_included_frames+=1
        else:
            detected_faces = []
            
        
        if face_detected  and not freeze and face_included_frames == frame_threshold: 
            print("tofreeze")
            freeze = True
            #base_img = img.copy()
            base_img = raw_img.copy()
            detected_faces_final = detected_faces.copy()
            tic = time.time()

        if freeze == True:            
            toc = time.time()
            if (toc - tic) < time_threshold:

                if freezed_frame == 0:
                    #detection: always done
                    freeze_img = base_img.copy()
                    # here, np.uint8 handles showing white area issue
                    # freeze_img = np.zeros(resolution, np.uint8)                

                    for detected_face in detected_faces_final:
                        attributes={}
                        x = detected_face[0]
                        y = detected_face[1]
                        w = detected_face[2]
                        h = detected_face[3]
                        
                        # -------------------------------
                        # extract detected face
                        custom_face = base_img[y : y + h, x : x + w]
                        
                        # draw rectangle to main image
                        cv2.rectangle(
                            freeze_img, (x, y), (x + w, y + h), (67, 67, 67), 1
                        )
                        
                        #path=''
                        #recognition
                        if(flag_recognize):
                            print('recognize')
                            row=recognize(freeze_img,detected_face,df_rec)
                            #print(fid)
                            attributes['recognized']= "Yes" if row is not None else "No"
                            if row is None:
                                flag_analyze=True
                            else:
                                attributes['name']=row['name']    
                                #attributes['path']=row['path'] 
                                #maybe portait
                            
                        # facial attribute analysis   
                        if(flag_analyze):
                            #deepface
                            #age_gender = DeepFace.analyze(actions = ['age','gender'],img_path=custom_face,
                            #    detector_backend='dlib',
                            #    enforce_detection=False,
                            #    silent=True,)
                            #attributes['age'] = age_gender[0]['age']
                            #attributes['gender'] = age_gender[0]['dominant_gender']
                            
                            #custom age_gender
                            age_gender = detectAgeGender_wiki(freeze_img,detected_face,ageNet,genderNet)
                            for key in age_gender.keys():
                                attributes[key] =age_gender[key]
                                
                           
                        
                        #face forgery detection
                        if(flag_deepfake):    
                            prediction, output = image_frame_pred(freeze_img,detected_face, model_full_c23,
                                                    cuda=False)           
                       
                            output_list = ['{0:.2f}'.format(float(x)) for x in
                            output.detach().cpu().numpy()[0]]
                            print(output_list, prediction)
                            attributes["deepfake"]="detected" if prediction==1 else "not detected"
                        
                        
                        #liveness detection
                        if(flag_liveness):
                            landmarks_collection = fr.face_landmarks(freeze_img)
                            smile=fr_smile_detect(freeze_img, detected_face, landmarks_collection[0])       
                            attributes["smiled"] = "Yes" if smile else "No" 
        
                            #eye roll detection
                            er=fr_dnn_eye_roll_detect(freeze_img, detected_face,landmarks_collection[0])
                            attributes["rolled eyes"] = "Yes" if er else "No"
                            
                            
                            #emo = DeepFace.analyze(actions = ['emotion'],img_path=custom_face,
                            #    detector_backend='dlib',
                            #    enforce_detection=False,
                            #    silent=True,)
                            #attributes['emotion'] = emo[0]['dominant_emotion']
                            
                            emo = detectEmotion(freeze_img,detected_face,emoNet)
                            attributes["emotion"] = emo['emotion']
                        
                                                
                                                 
                           
                        if(flag_detect or flag_analyze or flag_deepfake):
                            df_row= {"frame_id":frame_id,
                                     "age":attributes.get("age", '')
                                   ,"gender":attributes.get("gender", ''),
                                    "name" : attributes.get("name", '') ,     
                                   "deepfake":attributes.get("deepfake", '')                                     
                               ,"encoding": fr.face_encodings(custom_face)[0]}
                            df_detect=df_detect.append(df_row, ignore_index = True) 
                            print(frame_id)
                            print(df_detect)
                    
                        # --------------------------------
                        # face recognition
                        # call find function for custom_face
                        if(flag_match):
                            row=recognize(freeze_img,detected_face,df_detect)
                            attributes['matched']= "Yes" if row is not None else "No"
                            texts,_=do_pdocr(freeze_img)
                            verified = True
                            if len(texts)<3:
                                verified = False
                            else:
                                print(df_verification)
                                verified = verify_identity(df_verification,texts[0],texts[1],texts[2])
                                attributes['ID']= texts[0]
                                attributes['Name']= f'{texts[1]} {texts[2]}'
                                
                            attributes['Verified']="Yes" if verified else "No"     
                            print(attributes['Verified'])
                            
                        
                        draw_summary_box(freeze_img,attributes,x,y,w,h,resolution_x,resolution_y)  
                                   
                        
                                
                        tic = time.time()  # in this way, freezed image can show 5 seconds
                        

                        # -------------------------------

                time_left = int(time_threshold - (toc - tic) + 1)

                cv2.rectangle(freeze_img, (10, 10), (90, 50), (67, 67, 67), -10)
                cv2.putText(
                    freeze_img,
                    str(time_left),
                    (40, 40),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1,
                    (255, 255, 255),
                    1,
                )
                out.write(freeze_img)
                cv2.imshow("img", freeze_img)

                freezed_frame +=1
            else:
                face_detected = False
                face_included_frames = 0
                freezed_frame=0
                freeze = False
                flag_detect= False
                flag_recognize = False
                flag_match = False
                flag_liveness = False
                flag_deepfake = False
                flag_analyze = False
                flag_OCR=False
               

        else:
            out.write(img)
            cv2.imshow("img", img)
    else:
        out.write(img)
        cv2.imshow("img", img)
                
    frame_id+=1
    if key == ord("q"):  # press q to quit
        break

    # kill open cv things
cap.release()
out.release()
cv2.destroyAllWindows()        

[2023/05/03 10:51:56] ppocr DEBUG: Namespace(alpha=1.0, benchmark=False, beta=1.0, cls_batch_num=6, cls_image_shape='3, 48, 192', cls_model_dir='C:\\Users\\elinam/.paddleocr/whl\\cls\\ch_ppocr_mobile_v2.0_cls_infer', cls_thresh=0.9, cpu_threads=10, crop_res_save_dir='./output', det=True, det_algorithm='DB', det_box_type='quad', det_db_box_thresh=0.6, det_db_score_mode='fast', det_db_thresh=0.3, det_db_unclip_ratio=1.5, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, det_east_score_thresh=0.8, det_limit_side_len=960, det_limit_type='max', det_model_dir='C:\\Users\\elinam/.paddleocr/whl\\det\\en\\en_PP-OCRv3_det_infer', det_pse_box_thresh=0.85, det_pse_min_area=16, det_pse_scale=1, det_pse_thresh=0, det_sast_nms_thresh=0.2, det_sast_score_thresh=0.5, draw_img_save_dir='./inference_results', drop_score=0.5, e2e_algorithm='PGNet', e2e_char_dict_path='./ppocr/utils/ic15_dict.txt', e2e_limit_side_len=768, e2e_limit_type='max', e2e_model_dir=None, e2e_pgnet_mode='fast', e2e_pgnet_score_th

In [51]:
    # kill open cv things
cap.release()
cv2.destroyAllWindows()      