In [1]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Lambda, Flatten
from sklearn.preprocessing import Normalizer, LabelEncoder
from PIL import Image
import os
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from numpy import asarray, expand_dims
from keras_vggface.utils import preprocess_input
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
import cv2
from collections import defaultdict
from numpy import argsort
from numpy import load
from tensorflow.keras.models import load_model
from numpy import save, load, savez_compressed
from tensorflow.keras.models import load_model
from keras.utils.np_utils import to_categorical
import time

In [2]:
#function which takes old model as input and adds one more output node and returns a new model
def updateModel(model):
    # creating a new model
    model_2 = Sequential()

    # getting all the layers except the output one
    for layer in model.layers[:-1]: # just exclude last layer from copying
        model_2.add(layer)

    # prevent the already trained layers from being trained again 
    # (you can use layers[:-n] to only freeze the model layers until the nth layer)
    # for layer in model_2.layers:
    #     layer.trainable = False

    # adding the new output layer, the name parameter is important 
    # otherwise, you will add a Dense_1 named layer, that normally already exists, leading to an error
    num_cats = model.get_layer(index = -1).get_config()['units']
    model_2.add(Dense(num_cats+1, name = 'new_Dense', input_shape=(512,), kernel_initializer = 'he_uniform', activation = 'softmax'))
    model_2.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model_2

In [3]:
#funtion which takes old model, dataset and new data. adds to the original dataset and updates it
def addNewLabel(modelL, modelR, trainX, trainy, testX, testy, new_data):
    #load new dataset
    trainX2, trainy2, testX2, testy2 = new_data

    trainX, trainy, testX, testy = list(trainX), list(trainy), list(testX), list(testy)
    trainX2, trainy2, testX2, testy2 = list(trainX2), list(trainy2), list(testX2), list(testy2)

    trainy += trainy2
    trainX += trainX2
    testy += testy2
    testX += testX2
    
    modelL = updateModel(modelL)
    modelR = updateModel(modelR)
    return modelL, modelR, trainX, trainy, testX, testy

In [4]:
#Normalize input vectors and numerize the labels.
def encodeLabels(trainX, trainy, testX, testy):
	# normalize input vectors
	in_encoder = Normalizer(norm='l2')
	trainX = in_encoder.transform(trainX)
	testX = in_encoder.transform(testX)
	# label encode targets
	out_encoder = LabelEncoder()
	out_encoder.fit(sorted(trainy, key = lambda x: int(x.split('_')[0])))
    #print(sorted(testy, key = lambda x: int(x.split('_')[0])))
	trainy = out_encoder.transform(sorted(trainy, key = lambda x: int(x.split('_')[0])))
    #print(trainy)
	testy = out_encoder.transform(sorted(testy, key = lambda x: int(x.split('_')[0])))
    #print(testy)
	return trainX, trainy, testX, testy, out_encoder

In [5]:
#function to incerase the number of images of a class by a given number
def extendDataset(image_folder_path, extend_by):
    parent = image_folder_path
    X = []
    y = []
    i = 0

    image_gen = ImageDataGenerator(rotation_range=5,
                                   rescale = False,
                                   shear_range = 0.2,
                                   fill_mode='reflect',
                                   horizontal_flip=False,
                                   vertical_flip=False,
                                   brightness_range=[0.5, 1.5])
    
    for loc in os.listdir(parent):
        i += 1
        print(loc)
        im = Image.open(image_folder_path+loc)   
        im = im.resize((224, 224))
        im_array = np.asarray(im)
        X.append(im_array)
        ID = loc.split("_")
        y.append(ID[0]+"_"+ID[1])
        iter = image_gen.flow(np.expand_dims(im, 0))

        for _ in range(extend_by):
            X.append(np.asarray(next(iter)[0].astype(np.uint8)))
            # ID = loc.split("_")
            y.append(ID[0]+"_"+ID[1])

    X = np.asarray(X)
    y = np.asarray(y)
    return [X, y]


In [6]:
model = VGG16(weights='imagenet', include_top=False)

In [7]:
#extract embeddings of a face using model
def extract_embedding(face, model):
    img_data = face.astype('float32')
    img_data = expand_dims(img_data, axis=0)
    img_data = preprocess_input(img_data)
    vgg16_feature = model.predict(img_data)    
    return vgg16_feature

In [8]:
def getEmbeddings(trainX, testX):
    global model
    i = -1
    newTrainX = []
    for face in trainX:
        i += 1 
        if i%100 == 0: 
            print(i/100, end = ' ')
        embedding = extract_embedding(face, model)
        newTrainX.append(embedding.flatten())
    newTrainX = asarray(newTrainX)
    print('')
    print(newTrainX.shape)

    i = -1
    newTestX = []
    for face in testX:
        i += 1
        if i%100 == 0: 
            print(i/100, end = ' ')
        embedding = extract_embedding(face, model)
        newTestX.append(embedding.flatten())
    newTestX = asarray(newTestX)
    print('')
    print(newTestX.shape)
    
    return newTrainX, newTestX

In [9]:
#function to get left/right eye regions points
def getRegion(eye, y1, y2, x1, x2):
    w, h = x2-x1, y2-y1
    ex1, ey1, w1, h1 = eye

    if (x1 < ex1 < x1+w//2) and (x1 < ex1 + w1 < x1+w//2): 
        return 1, w1*h1
    else:
        return 2, w1*h1

In [40]:
#function which takes person ID and path where image has to be saved and number to images to be taken and captures images through webcam 
def getPersonImages(ID, path, imcount):
    eye_cascade = cv2.CascadeClassifier('./haarcascade_eye.xml')
    cap = cv2.VideoCapture(0)
    i = 0
    while True:
        ret, img = cap.read()
        #gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        #cv2.imshow('img',img)
        y1, y2 = 150, 300
        x1, x2 = 150, 450
        new_img = img[y1:y2, x1:x2]
        cv2.rectangle(img,(x1,y1),(x2, y2),(255,0,0),1)
        cv2.line(img, ((x1+x2)//2, y1), ((x1+x2)//2, y2), (0, 0, 255), 1)

        eyes = eye_cascade.detectMultiScale(new_img, 1.3, 1)


        if (len(eyes) == 2):
            r1, a1 = getRegion(eyes[0], y1, y2, x1, x2)
            r2, a2 = getRegion(eyes[1], y1, y2, x1, x2)

            # for (ex,ey,ew,eh) in eyes:
            #     ex += x1
            #     ey += y1
            #     cv2.rectangle(img,(ex,ey),(ex+ew,ey+eh),(255,255,255),1)

            if (r1 != r2) and (a1 >= 6500 and a2 >= 6500):
                cv2.rectangle(img,(x1,y1),(x2, y2),(0,255,0),1)
                if i < imcount:
                    i += 1
                    cv2.imwrite(path + ID + '_L_' +str(i)+'.jpg',new_img[1:,1:(x2-x1)//2])
                    cv2.imwrite(path + ID + '_R_' +str(i)+'.jpg',new_img[1:,(x2-x1)//2 +1:])
                else:
                    cap.release()
                    cv2.destroyAllWindows()
                    break
        else:
            cv2.rectangle(img,(x1,y1),(x2, y2),(0,0,255),1)

        cv2.imshow('img',img)
        k = cv2.waitKey(30) & 0xff
        if k == 27:        
            break
getPersonImages("1234","./temp/",2)

In [11]:
#funtion taking l and r images of eyes and giving label output and confidence
def getPredictionCat(Lsample, Rsample, modelL, modelR):
    Lp = modelL.predict(Lsample.reshape(1,-1))[0]
    Ltop_values_index = sorted(range(len(Lp)), key=lambda i: Lp[i])[-5:]
    Ltop_values = [Lp[i] for i in argsort(Lp)[-5:]]

    probs = defaultdict(int)
    for clas, prob in zip(Ltop_values_index, Ltop_values):
        probs[clas] += prob


    Rp = modelR.predict(Rsample.reshape(1,-1))[0]
    Rtop_values_index = sorted(range(len(Rp)), key=lambda i: Rp[i])[-5:]
    Rtop_values= [Rp[i] for i in argsort(Rp)[-5:]]

    for clas, prob in zip(Rtop_values_index, Rtop_values):
        probs[clas] += prob

    mx_conf = 0
    plabel = -1
    for clas, prob in probs.items():
        if prob > mx_conf:
            mx_conf = prob
            plabel = clas

    confidence = mx_conf*0.5
    
    return (plabel, confidence)

In [12]:
#Sort the parts for easy reduction of samples per label.
from numpy import asarray
def makeParts(trainX, trainy, testX, testy):
    zeepTest = sorted(list(zip(testy, testX)), key = lambda x: int(x[0].split('_')[0]))
    zeepTrain = sorted(list(zip(trainy, trainX)), key = lambda x: int(x[0].split('_')[0]))
    trainx = [x for y, x in zeepTrain[:]]
    trainy = [y for y, x in zeepTrain[:]]
    testx = [x for y, x in zeepTest[:]]
    testy = [y for y, x in zeepTest[:]]

    return asarray(trainx), asarray(trainy), asarray(testx), asarray(testy)

In [46]:
#final function to recognise a person
def recognisePerson():
    modelR = load_model('Trained models/modelR.h5')
    modelL = load_model('Trained models/modelL.h5')

    # dummy ID
    getPersonImages("1234","./temp/new_img/current/",1)


    # Only do the preprocess
    ntestX, ntesty = extendDataset('temp/new_img/current/', 0)
    ntestX, _ = getEmbeddings(ntestX, [])

    encoderL = LabelEncoder()
    encoderL.classes_ = load('Latest models/classesL.npy')
    encoderR = LabelEncoder()
    encoderR.classes_ = load('Latest models/classesR.npy')
    
    label, prob = getPredictionCat(ntestX[0], ntestX[1], modelL, modelR) 
    if prob >= 0.75:
        l, r, p = [encoderL.inverse_transform([label]), encoderR.inverse_transform([label]), prob]
        if l[0][:-2] != r[0][:-2]:
            return("Model Corrupted!")
        else:
            return(l[0][:-2]+ " Confidence: " + str(p))
    else:
        return("Not recognised:", prob)

In [52]:
#final function to register a new person using webcam
def registerNewPerson():
    # Load the old model
    modelR = load_model('Trained models/modelR.h5')
    modelL = load_model('Trained models/modelL.h5')
    # load old dataset
    data = load('temp/embeddings.npz')
    trainX, trainy, testX, testy = data['arr_0'], data['arr_1'], data['arr_2'], data['arr_3']
    trainX, trainy, testX, testy = list(trainX), list(trainy), list(testX), list(testy)
    

    # Add new data into the model
    getPersonImages('1234', 'temp/img/test/', 1)
    getPersonImages('1234', 'temp/img/train/', 1)

    ntestX, ntesty = extendDataset('temp/img/test/', 1)
    ntrainX, ntrainy = extendDataset('temp/img/train/', 5)
    ntestX, ntrainX = getEmbeddings(ntestX, ntrainX)

    savetrainX, savetrainy, savetestX, savetesty =  trainX + list(ntrainX), trainy + list(ntrainy), testX + list(ntestX), testy + list(ntesty)
    savez_compressed('temp/latest_embeddings.npz', savetrainX, savetrainy, savetestX, savetesty)

    new_data = [ntrainX, ntrainy, ntestX, ntesty]
    modelL, modelR, trainX, trainy, testX, testy = addNewLabel(modelL, modelR, trainX, trainy, testX, testy, new_data)

    RtrainX, Rtrainy, RtestX, Rtesty, LtrainX, Ltrainy, LtestX, Ltesty = seperate(trainX, trainy, testX, testy)

    RtrainX, Rtrainy, RtestX, Rtesty = makeParts(RtrainX, Rtrainy, RtestX, Rtesty)
    LtrainX, Ltrainy, LtestX, Ltesty = makeParts(LtrainX, Ltrainy, LtestX, Ltesty)

    RtrainX, Rtrainy, RtestX, Rtesty, encoderR = encodeLabels(RtrainX, Rtrainy, RtestX, Rtesty)
    LtrainX, Ltrainy, LtestX, Ltesty, encoderL = encodeLabels(LtrainX, Ltrainy, LtestX, Ltesty)

    modelL = trainUpdatedModel(modelL, LtrainX, Ltrainy, LtestX, Ltesty)
    modelR = trainUpdatedModel(modelR, RtrainX, Rtrainy, RtestX, Rtesty)

    save('Latest models/classesL.npy', encoderL.classes_)
    save('Latest models/classesR.npy', encoderR.classes_)
    modelR.save('Latest models/modelR.h5') 
    modelL.save('Latest models/modelL.h5')

In [48]:
#seperate tean and test arrays of L and R images
def seperate(trainX, trainy, testX, testy):
	LtrainX = []
	Ltrainy = []

	RtrainX = []
	Rtrainy = []

	RtestX = []
	Rtesty = []

	LtestX = []
	Ltesty = []

	for x, y in zip(trainX, trainy):
	    if y.split('_')[-1] == 'L':
	        LtrainX.append(x)
	        Ltrainy.append(y)
	    else:
	        RtrainX.append(x)
	        Rtrainy.append(y)

	for x, y in zip(testX, testy):
	    if y.split('_')[-1] == 'L':
	        LtestX.append(x)
	        Ltesty.append(y)
	    else:
	        RtestX.append(x)
	        Rtesty.append(y)

	return RtrainX, Rtrainy, RtestX, Rtesty, LtrainX, Ltrainy, LtestX, Ltesty

In [49]:
#Fit for new label classification
def trainUpdatedModel(model, trainX, trainy, testX, testy):
    Y_train = to_categorical(trainy)
    Y_test = to_categorical(testy)
    
    t1 = time.time()
    _history = model.fit(asarray(trainX), Y_train,validation_data = (asarray(testX),Y_test), epochs=50, batch_size=64)
    t2 = time.time()

    print("Time taken:", t2-t1)
    return model

In [54]:
choice = int(input("Enter Choice: \n1. Register new person. \n2. Recognise person.\n"))
if choice == 1:
	registerNewPerson()
elif choice == 2:
	print(recognisePerson())

Enter Choice: 
1. Register new person. 
2. Recognise person.
2
1234_L_1.jpg
1234_R_1.jpg
0.0 
(2, 25088)

(0,)
('Not recognised:', 0.5)
