# Import Libraries

In [1]:
import os
import cv2
import time
import pickle
import numpy as np
from imutils import paths
import matplotlib.pyplot as plt

from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.model_selection import KFold
from sklearn.compose import ColumnTransformer

import tensorflow
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam

from mtcnn import MTCNN
# insightface folder
from insightface.src.common import face_preprocess
from insightface.deploy import face_model

import dlib

In [2]:
# Create a train_dir
train_dir = os.path.join(os.getcwd(),'train')

if not os.path.exists(train_dir):
    os.mkdir(train_dir)
    
# Create embedding_dir
embedding_dir = os.path.join(os.getcwd(),'faceEmbeddingModels')
if not os.path.exists(embedding_dir):
    os.mkdir(embedding_dir)
    
# Detector = mtcnn_detector
detector = MTCNN()

# 1. Collect User Image for Registration

In [None]:
image_size = '112,112'
max_images = 20
user = input()

def collectImagesFromCamera(user, max_images):
    
    cap = cv2.VideoCapture(0)
    
    faces = 0
    frames = 0
    max_faces = max_images
    max_bbox = np.zeros(4)
    
    
    if not os.path.exists(os.path.join(train_dir, user)):
        os.makedirs(os.path.join(train_dir, user))
        
    while faces < max_faces:
        ret, frame = cap.read()
        frames += 1
        time_string = str(int(time.time()))
        
        # Get all faces on current frame
        bboxes = detector.detect_faces(frame)
        
        if len(bboxes) != 0:
            # Get only the biggest face
            max_area = 0
            for bboxe in bboxes:
                bbox = bboxe["box"]
                bbox = np.array([bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]])
                keypoints = bboxe["keypoints"]
                area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
                if area > max_area:
                    max_bbox = bbox
                    landmarks = keypoints
                    max_area = area
                    
            max_bbox = max_bbox[0:4]
            
            # get each of 3 frames
            if frames % 3 == 0:
                # convert to face_preprocess.preprocess input
                landmarks = np.array([landmarks["left_eye"][0], landmarks["right_eye"][0], landmarks["nose"][0],
                                      landmarks["mouth_left"][0], landmarks["mouth_right"][0],
                                      landmarks["left_eye"][1], landmarks["right_eye"][1], landmarks["nose"][1],
                                      landmarks["mouth_left"][1], landmarks["mouth_right"][1]])
                landmarks = landmarks.reshape((2, 5)).T
                nimg = face_preprocess.preprocess(frame, max_bbox, landmarks, image_size= image_size)

                cv2.imwrite(os.path.join(os.path.join(train_dir, user), "{}.jpg".format(time_string)), nimg)
                cv2.rectangle(frame, (max_bbox[0], max_bbox[1]), (max_bbox[2], max_bbox[3]), (255, 0, 0), 2)
                faces += 1
                print("[INFO] {} Image Captured".format(faces))
                    
        
        cv2.imshow("Face detection", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
    cap.release()
    cv2.destroyAllWindows()
        
        
collectImagesFromCamera(user, max_images)

'''
# SAMPLE OUTPUT of detector.detect_faces(frame) :-
[
    {
        'box': [277, 90, 48, 63],
        'keypoints':
        {
            'nose': (303, 131),
            'mouth_right': (313, 141),
            'right_eye': (314, 114),
            'left_eye': (291, 117),
            'mouth_left': (296, 143)
        },
        'confidence': 0.99851983785629272
    }
]
'''

# 2. Generate Face Embeddings

In [4]:
image_size = '112,112'
model = "./insightface/models/model-y1-test2/model,0"
threshold = 1.24
det = 0
embeddings_file = "embeddings.pickle"

def genFaceEmbedding():
    # Grab the paths to the input images in our dataset
    imagePaths = list(paths.list_images(train_dir))
    # Initialize the faces embedder
    embedding_model = face_model.FaceModel(image_size, model, threshold, det)
    
    # Initialize our lists of extracted facial embeddings and corresponding people names
    knownEmbeddings = []
    knownNames = []

    # Initialize the total number of faces processed
    total = 0

    # Loop over the imagePaths
    for (i, imagePath) in enumerate(imagePaths):
        # extract the person name from the image path
        print("[INFO] processing image {}/{}".format(i + 1, len(imagePaths)))
        name = imagePath.split(os.path.sep)[-2]

        # load the image
        image = cv2.imread(imagePath)
        # convert face to RGB color
        nimg = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        nimg = np.transpose(nimg, (2, 0, 1))
        # Get the face embedding vector
        face_embedding = embedding_model.get_feature(nimg)

        # add the name of the person + corresponding face
        # embedding to their respective list
        knownNames.append(name)
        knownEmbeddings.append(face_embedding)
        total += 1
        
    print(total, " faces embedded")

    # save to output
    data = {"embeddings": knownEmbeddings, "names": knownNames}
    with open(os.path.join(embedding_dir,embeddings_file), "wb") as f:
        f.write(pickle.dumps(data))

genFaceEmbedding()

# 3. Data Preprocessing

In [5]:
# Load the face embeddings
data = pickle.loads(open(os.path.join(embedding_dir,embeddings_file), "rb").read())
embeddings = np.array(data["embeddings"])
input_shape = embeddings.shape[1]  # required for creating model

# Encode the labels
le = LabelEncoder()
labels = le.fit_transform(data["names"])
num_classes = len(np.unique(labels)) # required for creating model

labels = labels.reshape(-1, 1)
ct = ColumnTransformer([("names", OneHotEncoder(), [0])], remainder = 'passthrough')
labels = ct.fit_transform(labels)

# 4. Create the Model

In [7]:
def createModel(input_shape, num_classes):
    
    # Build sofmax classifier
    model = Sequential()
    model.add(Dense(1024, activation='relu', input_shape=(input_shape,)))
    model.add(Dense(1024, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    
    return model

my_model = createModel(input_shape, num_classes)
    
optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
my_model.compile(loss=tensorflow.keras.losses.categorical_crossentropy,
                    optimizer=optimizer,
                    metrics=['accuracy'])

# 5. Train the Model

In [8]:
BATCH_SIZE = 8
EPOCHS = 5
history = {'accuracy': [], 'val_accuracy': [], 'loss': [], 'val_loss': []}
# Create KFold
cv = KFold(n_splits = 5, random_state = 42, shuffle=True)

def trainModel():

    # Train
    for train_idx, valid_idx in cv.split(embeddings):
        X_train, X_val, y_train, y_val = embeddings[train_idx], embeddings[valid_idx], labels[train_idx], labels[valid_idx]
        his = my_model.fit(X_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS, verbose=1, validation_data=(X_val, y_val))

        history['accuracy'] += his.history['accuracy']
        history['val_accuracy'] += his.history['val_accuracy']
        history['loss'] += his.history['loss']
        history['val_loss'] += his.history['val_loss']
        
trainModel()

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


# 6. Save the Model

In [7]:
my_model.save('my_model.h5')

# also save the lables
labels_file = "le.pickle"
with open(os.path.join(embedding_dir,labels_file), "wb") as f:
    f.write(pickle.dumps(le))

# 7. Inference

In [15]:
image_size = '112,112'
model = "./insightface/models/model-y1-test2/model,0"
threshold = 1.24
det = 0

# Initialize faces embedding model
embedding_model = face_model.FaceModel(image_size, model, threshold, det)

embeddings = os.path.join(embedding_dir, embeddings_file)
le = os.path.join(embedding_dir, labels_file)

# Load embeddings and labels
data = pickle.loads(open(embeddings, "rb").read())
le = pickle.loads(open(le, "rb").read())

embeddings = np.array(data['embeddings'])
labels = le.fit_transform(data['names'])

# Load the classifier model
my_model = tensorflow.keras.models.load_model('./my_model.h5')

loading ./insightface/models/model-y1-test2/model 0


In [14]:
def findCosineDistance(vector1, vector2):
        """
        Calculate cosine distance between two vector
        """
        vec1 = vector1.flatten()
        vec2 = vector2.flatten()

        a = np.dot(vec1.T, vec2)
        b = np.dot(vec1.T, vec1)
        c = np.dot(vec2.T, vec2)
        return 1 - (a / (np.sqrt(b) * np.sqrt(c)))

def CosineSimilarity(test_vec, source_vecs):
        """
        Verify the similarity of one vector to group vectors of one class
        """
        cos_dist = 0
        for source_vec in source_vecs:
            cos_dist += findCosineDistance(test_vec, source_vec)
        return cos_dist / len(source_vecs)

def detectface():
    # Initialize some useful arguments
    cosine_threshold = 0.8   # similarity threshold
    proba_threshold = 0.85    # predicted threshold/confidence
    comparing_num = 5
    # Tracker params
    trackers = []
    texts = []
    frames = 0
    
    # Start streaming and recording
    cap = cv2.VideoCapture(0)
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    print(str(frame_width) + " : " + str(frame_height))
    save_width = 800
    save_height = int(800 / frame_width * frame_height)
    print(str(save_width) + " : " + str(save_height))
    
    
    while True:
        ret, frame = cap.read()
        frames += 1
        
        if frame is not None:
            frame = cv2.resize(frame, (save_width, save_height))
        
            if frames % 3 == 0:
                trackers = []
                texts = []

                bboxes =  detector.detect_faces(frame)
            
                if len(bboxes) != 0:

                    for bboxe in bboxes:
                        bbox = bboxe['box']
                        bbox = np.array([bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]])
                        landmarks = bboxe['keypoints']
                        landmarks = np.array([landmarks["left_eye"][0], landmarks["right_eye"][0], landmarks["nose"][0],
                                              landmarks["mouth_left"][0], landmarks["mouth_right"][0],
                                              landmarks["left_eye"][1], landmarks["right_eye"][1], landmarks["nose"][1],
                                              landmarks["mouth_left"][1], landmarks["mouth_right"][1]])
                        landmarks = landmarks.reshape((2, 5)).T
                        nimg = face_preprocess.preprocess(frame, bbox, landmarks, image_size='112,112')
                        nimg = cv2.cvtColor(nimg, cv2.COLOR_BGR2RGB)
                        nimg = np.transpose(nimg, (2, 0, 1))
                        embedding = embedding_model.get_feature(nimg).reshape(1, -1)

                        text = "Unknown"
                    
                        # Predict class
                        preds = my_model.predict(embedding)
                        preds = preds.flatten()
                        # Get the highest accuracy embedded vector
                        j = np.argmax(preds)
                        proba = preds[j]
                    
                        # Compare this vector to source class vectors to verify it is actual belong to this class
                        match_class_idx = (labels == j)
                        match_class_idx = np.where(match_class_idx)[0]
                        selected_idx = np.random.choice(match_class_idx, comparing_num)
                        compare_embeddings = embeddings[selected_idx]
                    
                        # Calculate cosine similarity
                        cos_similarity = CosineSimilarity(embedding, compare_embeddings)
                    
                        if cos_similarity < cosine_threshold and proba > proba_threshold:
                            name =  le.classes_[j]
                            text = "{}".format(name)
                            print("Recognized: {} <{:.2f}>".format(name, proba * 100))
                        
                        # Start tracking
                        tracker = dlib.correlation_tracker()
                        rect = dlib.rectangle(bbox[0], bbox[1], bbox[2], bbox[3])
                        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                        tracker.start_track(rgb, rect)
                        trackers.append(tracker)
                        texts.append(text)
                    
                        y = bbox[1] - 10 if bbox[1] - 10 > 10 else bbox[1] + 10
                        cv2.putText(frame, text, (bbox[0], y), cv2.FONT_HERSHEY_SIMPLEX, 0.95, (255, 255, 255), 1)
                        cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (179, 0, 149), 4)
                    
            else:
                for tracker, text in zip(trackers, texts):
                    pos = tracker.get_position()

                    # unpack the position object
                    startX = int(pos.left())
                    startY = int(pos.top())
                    endX = int(pos.right())
                    endY = int(pos.bottom())

                    cv2.putText(frame, text, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.95, (255, 255, 255), 1)
                    cv2.rectangle(frame, (startX, startY), (endX, endY), (179, 0, 149), 4)
        
            cv2.imshow("Frame", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            
            
        else:
            # frame = None
            pass
        
    cap.release()
    cv2.destroyAllWindows()
             
detectface()

640 : 480
800 : 600
Recognized: dheeraj <100.00>
Recognized: dheeraj <100.00>
