# Requirements

In [None]:
!pip install -r requirement.txt
print('Successfully loaded requirements')

# Imports

In [None]:
import tensorflow as tf
import numpy as np
import os
import glob
import pickle
import cv2
import time
import pyaudio
import wave
import warnings
import python_speech_features as mfcc
import io
import ffmpeg
import gtts
import os.path
import shutil
import serial.tools.list_ports
import platform

from IPython.display import display, Javascript, Audio, clear_output
from base64 import b64decode
from numpy import genfromtxt
from scipy.io.wavfile import read
from sklearn import mixture
from sklearn.mixture import GMM 
from sklearn import preprocessing
from IPython.display import HTML, Audio
from scipy.io.wavfile import read as wav_read
from os import path 
from io import BytesIO
from pydub import AudioSegment
from keras import backend as K
from keras.models import load_model
from js2py import eval_js

K.set_image_data_format('channels_first')
np.set_printoptions(threshold=np.inf, linewidth=np.nan)
warnings.filterwarnings("ignore")

print("Imports successfully loaded...")

# Connecting Serial IO
* Links arduino board according to port number

In [None]:
ser = serial.Serial()
ser.baudrate = '9600'
ser.port = 'COM3'
ser.open()

print("Access granted..")

# Text to Audio Function 
* Converts text strings to voice outputs

In [None]:
def Text2Speech(text):
  new_sound = str(text)
  tts = gtts.gTTS(new_sound)
  new_sound_file = str('./Text2Speech/new_sound.mp3')
  tts.save(new_sound_file)
  
  return Audio(new_sound_file, autoplay = True)
  
print('Loaded...')

# Audio Processing Functions
* Extracts 40 dimensional MFCC and delta MFCC features as a vector

In [None]:
#Calculate and returns the delta of given feature vector matrix
def calculate_delta(array):
    rows,cols = array.shape
    deltas = np.zeros((rows,20))
    N = 2
    for i in range(rows):
        index = []
        j = 1
        while j <= N:
            if i-j < 0:
                first = 0
            else:
                first = i-j
            if i+j > rows -1:
                second = rows -1
            else:
                second = i+j
            index.append((second,first))
            j+=1
        deltas[i] = ( array[index[0][0]]-array[index[0][1]] + (2 * (array[index[1][0]]-array[index[1][1]])) ) / 10
    return deltas

#convert audio to mfcc features
def extract_features(audio,rate):    
    mfcc_feat = mfcc.mfcc(audio,rate, 0.025, 0.01,20,appendEnergy = True, nfft=1103)
    mfcc_feat = preprocessing.scale(mfcc_feat)
    delta = calculate_delta(mfcc_feat)

    #combining both mfcc features and delta
    combined = np.hstack((mfcc_feat,delta)) 
    return combined

print('Loaded...')

# Facial Encoding
The model provides output as 128 dim encoding vector for the input image.

In [None]:
#provides 128 dim embeddings for face
def img_to_encoding(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    #converting img format to channel first
    img = np.around(np.transpose(img, (2,0,1))/255.0, decimals=12)

    x_train = np.array([img])

    #facial embedding from trained model
    embedding = model.predict_on_batch(x_train)
    return embedding

print("Loaded...")

# Triplet Loss
Two encodings are compared and if they are similar then two images are of the same person otherwise they are different.

In [None]:
def triplet_loss(y_true, y_pred, alpha = 0.2):
    anchor, positive, negative = y_pred[0], y_pred[1], y_pred[2]
    
    # triplet loss formula 
    pos_dist = tf.reduce_sum( tf.square(tf.subtract(y_pred[0], y_pred[1])) )
    neg_dist = tf.reduce_sum( tf.square(tf.subtract(y_pred[0], y_pred[2])) )
    basic_loss = pos_dist - neg_dist + alpha
    
    loss = tf.maximum(basic_loss, 0.0)
   
    return loss

# load the model
# where is facenet_model/model.h5 stored? for colab you'll need to upload this file to your GDrive and then use the mount function: https://colab.research.google.com/notebooks/io.ipynb
# model = load_model('/content/gdrive/MyDrive/ColabNotebooks/facenet_model/model.h5', custom_objects={'triplet_loss': triplet_loss})

model = load_model('./facenet_model/model.h5', custom_objects={'triplet_loss': triplet_loss}, compile = False)

print("Loaded..")

# Delete User Data 

In [None]:
def delete_user(name):
    
    with open("./face_database/embeddings.pickle", "rb") as database:
        db = pickle.load(database)
        user = db.pop(name, None)
    
        if user is not None:
            print('User ' + name + ' deleted successfully')
            # save the database
            with open('./face_database/embeddings.pickle', 'wb') as database:
                    pickle.dump(db, database, protocol=pickle.HIGHEST_PROTOCOL)

            # remove the speaker wav files and gmm model
            [os.remove(path) for path in glob.glob('./voice_database/' + name + '/*')]
            os.removedirs('./voice_database/' + name)
            os.remove('./gmm_models/' + name + '.gmm')
            
            delete = str( name + ' your data has been deleted')
            return delete
        
        else:
            print('No such user !!')
            delete = 'User is not registered, please enter an existing user'
            return delete
        
print('Successfully loaded function...')

# Add User Function
* Facial data    
    * Face detection 
    * Extracts facial embeddings from photo and stores as pickle data
* Voice data
    * User repeats password 3 times
    * Extracts MFCC features
    * Concatenates all 3 voice samples as features
    * Passes to GMM model and saves as .gmm file

In [None]:
# for data i/o you'll need to create the relevant folders on your GDrive and then use the mount function: https://colab.research.google.com/notebooks/io.ipynb
def add_user(name):
    
    with open('./face_database/embeddings.pickle', 'rb') as database:
        db = pickle.load(database)   
    
    cap = cv2.VideoCapture(0)
    cap.set(3, 640)
    cap.set(4, 480)
    
    #detecting only frontal face using haarcascade
    face_cascade = cv2.CascadeClassifier('./haarcascades/haarcascade_frontalface_default.xml')
    
    i = 3
    face_found = False
    
    while True:            
        _, frame = cap.read()
        frame = cv2.flip(frame, 1, 0)
            
        #time.sleep(1.0)
        cv2.putText(frame, 'Keep Your Face infront of Camera', (100, 200),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        
        cv2.putText(frame, 'Starting', (260, 270), cv2.FONT_HERSHEY_SIMPLEX, 
                    0.8, (255, 255, 255), 2)
        
        cv2.putText(frame, str(i), (290, 330), cv2.FONT_HERSHEY_SIMPLEX, 
                    1.3, (255, 255, 255), 3)
        
        i-=1
                   
        cv2.imshow('frame', frame)
        cv2.waitKey(1000) 
                
        if i < 0:
            break
            
    start_time = time.time()        

    ## Face recognition 
    while True:
        curr_time = time.time()
        
        _,frame = cap.read()
        frame = cv2.flip(frame, 1, 0)
        # if (frame is not None):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        face = face_cascade.detectMultiScale(gray, 1.3, 5)
        
        if len(face) == 1:
            for(x, y, w, h) in face:
                roi = frame[y-10:y+h+10, x-10:x+w+10]

                fh, fw = roi.shape[:2]

                #make sure the face roi is of required height and width
                if fh < 20 and fw < 20:
                    continue

                face_found = True
                #cv2.imwrite(img_path, roi)

                cv2.rectangle(frame, (x-10,y-10), (x+w+10, y+h+10), (255, 200, 200), 2)

         
        if curr_time - start_time >= 3:
            break
            
        cv2.imshow('frame', frame)
        cv2.waitKey(1)
            
    cap.release()        
    cv2.destroyAllWindows()

    
    if face_found:
        img = cv2.resize(roi, (96, 96))

        db[name] = img_to_encoding(img)

        with open('./face_database/embeddings.pickle', "wb") as database:
            pickle.dump(db, database, protocol=pickle.HIGHEST_PROTOCOL)
    
    elif len(face) > 1:
        print("More than one faces found. Try again...")
        return
    
    else:
        print('There was no face found in the frame. Try again...')
        return
      
    clear_output(wait=True) 
    
#     Voice authentication
    FORMAT = pyaudio.paInt16
    CHANNELS = 2
    RATE = 44100
    CHUNK = 1024
    RECORD_SECONDS = 3
    
    source = "./voice_database/" + name
    
   
    os.mkdir(source)

    for i in range(3):
        audio = pyaudio.PyAudio()

        if i == 0:
            j = 3
            while j>=0:
                time.sleep(1.0)
                print("The password is Open Sesame")
                print("Say it in {} seconds".format(j))
                clear_output(wait=True)

                j-=1

        elif i ==1:
            print("Say it one more time")
            time.sleep(0.5)

        else:
            print("Say it one last time")
            time.sleep(0.5)

        # start Recording
        stream = audio.open(format=FORMAT, channels=CHANNELS,
                    rate=RATE, input=True,
                    frames_per_buffer=CHUNK)

        print("recording...")
        frames = []

        for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
            data = stream.read(CHUNK)
            frames.append(data)

        # stop Recording
        stream.stop_stream()
        stream.close()
        audio.terminate()
        
        # saving wav file of speaker
        waveFile = wave.open(source + '/' + str((i+1)) + '.wav', 'wb')
        waveFile.setnchannels(CHANNELS)
        waveFile.setsampwidth(audio.get_sample_size(FORMAT))
        waveFile.setframerate(RATE)
        waveFile.writeframes(b''.join(frames))
        waveFile.close()
        print("Done")

    dest =  "./gmm_models/"
    count = 1

    for path in os.listdir(source):
        path = os.path.join(source, path)

        features = np.array([])
        
        # reading audio files of speaker
        (sr, audio) = read(path)
        
        # extract 40 dimensional MFCC & delta MFCC features
        vector   = extract_features(audio,sr)

        if features.size == 0:
            features = vector
        else:
            features = np.vstack((features, vector))
            
        # when features of 3 files of speaker are concatenated, then do model training
        if count == 3:    
            gmm = GMM(n_components = 16, n_iter = 200, covariance_type='diag',n_init = 3)
#             gmm = mixture.GaussianMixture(n_components = 16, n_iter = 200, covariance_type='diag',n_init = 3)

            gmm.fit(features)

            # saving the trained gaussian model
            pickle.dump(gmm, open(dest + name + '.gmm', 'wb'))
            print(name + ' is successfully registered in the database.') 
            
            features = np.asarray(())
            count = 0
        count = count + 1

# if __name__ == '__main__':
#     add_user()

# Recognise Function
* Voice recognition based on GMM model, comparing extracted MFCC features
* Face recognition based on Siamese Neural Network model, comparing extracted embeddings with user embeddings in database. 

In [None]:
from main_functions import *

def recognize():
    # Voice Authentication
    FORMAT = pyaudio.paInt16
    CHANNELS = 2
    RATE = 44100
    CHUNK = 1024
    RECORD_SECONDS = 4
    FILENAME = "./test.wav"

    audio = pyaudio.PyAudio()
    
    j = 3
    while j>=0:
        time.sleep(1.0)
        print("What is password?")
        print("Say it in {} seconds".format(j))
        clear_output(wait=True)

        j-=1
   
    # start Recording
    stream = audio.open(format=FORMAT, channels=CHANNELS,
                    rate=RATE, input=True,
                    frames_per_buffer=CHUNK)

    time.sleep(0.5)
    print("recording...")
    frames = []

    for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        data = stream.read(CHUNK)
        frames.append(data)
    print("finished recording")


    # stop Recording
    stream.stop_stream()
    stream.close()
    audio.terminate()

    # saving wav file 
    waveFile = wave.open(FILENAME, 'wb')
    waveFile.setnchannels(CHANNELS)
    waveFile.setsampwidth(audio.get_sample_size(FORMAT))
    waveFile.setframerate(RATE)
    waveFile.writeframes(b''.join(frames))
    waveFile.close()

    modelpath = "./gmm_models/"

    gmm_files = [os.path.join(modelpath,fname) for fname in 
                os.listdir(modelpath) if fname.endswith('.gmm')]

    models    = [pickle.load(open(fname,'rb')) for fname in gmm_files]

    speakers   = [fname.split("/")[-1].split(".gmm")[0] for fname 
                in gmm_files]
  
    if len(models) == 0:
        print("No Users in the Database!")
        return
        
    #read test file
    sr,audio = read(FILENAME)

    # extract mfcc features
    vector = extract_features(audio,sr)
    log_likelihood = np.zeros(len(models)) 

    #checking with each model one by one
    for i in range(len(models)):
        gmm = models[i]         
        scores = np.array(gmm.score(vector))
        log_likelihood[i] = scores.sum()

    pred = np.argmax(log_likelihood)
    identity = speakers[pred]
   
    # if voice not recognized than terminate the process
    if identity == 'unknown':
            print("Not Recognized! Try again...")
            response = "Voice not recognized! Please try again"
            code = 0
            return response, code
    
    print( "User: ", identity)

    # face recognition
    print("Keep Your face infront of the camera")
    cap = cv2.VideoCapture(0)
    cap.set(3, 640)
    cap.set(4, 480)

    cascade = cv2.CascadeClassifier('./haarcascades/haarcascade_frontalface_default.xml')
    
    #loading the database 
    database = pickle.load(open('./face_database/embeddings.pickle', "rb"))
    
#     time.sleep(1.0)
    i = 3
    while True:            
        _, frame = cap.read()
        frame = cv2.flip(frame, 1, 0)
            
        #time.sleep(1.0)
        cv2.putText(frame, 'Keep Your Face infront of Camera', (100, 200),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        
        cv2.putText(frame, 'Commencing Facial Recognition Sequence', (80, 270), cv2.FONT_HERSHEY_SIMPLEX, 
                    0.8, (255, 255, 255), 2)
        
        cv2.putText(frame, str(i), (290, 330), cv2.FONT_HERSHEY_SIMPLEX, 
                    1.3, (255, 255, 255), 3)
        
        i-=1
                   
        cv2.imshow('frame', frame)
        cv2.waitKey(1000) 
                
        if i < 0:
            break
            
    start_time = time.time()
    
    while True:
        curr_time = time.time()
            
        _, frame = cap.read()
        frame = cv2.flip(frame, 1, 0)
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        face = cascade.detectMultiScale(gray, 1.3, 5)
         
        name = 'unknown'
        
        
        if len(face) == 1:

            for (x, y, w, h) in face:
                roi = frame[y-10:y+h+10, x-10:x+w+10]
            
                fh, fw = roi.shape[:2]
                min_dist = 100
                
                #make sure the face is of required height and width
                if fh < 20 and fh < 20:
                    continue

                
                #resizing image as required by the model
                img = cv2.resize(roi, (96, 96))

                #128 d encodings from pre-trained model
                encoding = img_to_encoding(img)
                
                # loop over all the recorded encodings in database 
                for knownName in database:
                    # find the similarity between the input encodings and recorded encodings in database using L2 norm
                    dist = np.linalg.norm(np.subtract(database[knownName], encoding) )
                    # check if minimum distance or not
                    if dist < min_dist:
                        min_dist = dist
                        name = knownName

            # if min dist is less then threshold value and face and voice matched than unlock the door
            if min_dist <= 0.7 and name == identity:
                entry = str('Door Unlocked! Welcome ' + name + ', you may enter!')
                code = 1
                cap.release()
                cv2.destroyAllWindows()
                return entry, code
#                 break

        #open the cam for 3 seconds
        if curr_time - start_time >= 3:
            break    

#         cv2.waitKey(1)
        cv2.imshow('frame', frame)
        cv2.waitKey(500)
        
    cap.release()
    cv2.destroyAllWindows()
   
    if len(face) == 0:
        response = 'There was no face found in the frame. Please try again..'
        code = 0
        print(response)
        return response, code
        
    elif len(face) > 1:
        response = 'The system is detecting more than one face. Please try again..'
        code = 0
        print(response)
        return response, code
        
    elif min_dist > 0.7 or name != identity:
        response = 'User is not recognised. Entry denied'
        code = 0
        print(response)
        return response, code
   
        
if __name__ == '__main__':
    response, code = recognize()
    
Text2Speech(response)
# print(code)


# List of registered users

In [None]:
def check_list():
    database = pickle.load(open('./face_database/embeddings.pickle', "rb"))
    soize = len(database)

    
    if soize == 0: 
        print("There are no registered users.")
    else:
        print("List of registered users: ")
        i = 1
        while (i < soize + 1):
          for knownName in database: 
            name = knownName
            print(str(i) + '. ' + name)
            i+=1

check_list()

# User Registration (1)
* Updated UI to allow for dev-only access to user registration/removal

In [None]:
def authority_check():
    user = input('Would you like to register or delete a user? (y/n)')
    if user is 'y':
        res, code = recognize()
        if code == 1: 
            registration = user_reg()
            return registration
        else: 
            prompt = "Sorry, you do not have the authority."
            print(prompt)
            return prompt

authority = authority_check()
Text2Speech(authority)

# User Registration (2)
* Basic UI for registration and removal of users
* User selects option to proceed


In [None]:
## Functions for user interface: 
## 2 options for users: 1. Register new user 
##                          Case 1: User does not exist
##                          Case 2: User exists, remove option given
##                      2. Delete user 
##------------------------------------------------------------------------------

def user_reg():
  user = input('Do you want to: 1. Register new user; 2. Delete user : ')
  if user is '1':
    # Prompt user to enter name to search database
    name = input("Enter name: ")
    user_reg.name = name
    if os.path.exists('./face_database/embeddings.pickle'):
        with open('./face_database/embeddings.pickle', 'rb') as database:
            db = pickle.load(database)   
            
            if name in db or name == 'unknown':
                # print("User has been previously registered, would you like to delete it? (y/n): ")
                # If user exist, overwrite data 
                user = input('User has been previously registered, would you like to delete it? (y/n): ')
                
                if user is 'y':
                  delete_file = delete_user(name)
                  return delete_file
                
                elif user is 'n':
                  goodbye = str('Well good day')
                  print('Well good day then..')
                  return goodbye
                else: 
                  excuse = str('Please enter a valid response')
                  print('WTF')
                  return excuse
                          # return
            else:
                #if database not exists than creating new database
                db = {}


            face_cascade = cv2.CascadeClassifier('./haarcascades/haarcascade_frontalface_default.xml')

            user = input('Would you like to be registered? (y/n): ')
            if user is 'y':
              print('Name saved! Welcome to the system ' + name + '!')
              add_user(name)
              save_file = str('Hello '+ user_reg.name + ', your biometric data has been saved.')
              return save_file

            elif user is 'n':
              response = str('What are you waiting for then? Good day.')
              return response
            
            else:
              response = str('Please enter a valid response!')
              print(response)
              return response

  elif user is '2':
    check_list()
    print('Enter a user you would like to remove: ')
    text = input('')
    delete_file = delete_user_OG(text)
    return delete_file
    
  else: 
    goodbye = str('Please enter a valid response.')
    print('Please make a valid selection')
    return goodbye

registration = user_reg()

# print(registration)
Text2Speech(registration)

# PPBAS Sequence Initiator
* Acts a function caller, awaits user to approach device
* Once user is detected, initiates recognition sequence 
* If user is verified, unlocks door for 5 seconds 

In [None]:
# This code snippet demonstrates the overall authentication sequence of PPBAS
caller = 0
while caller < 1: 
    if caller == 1:
        break
    time.sleep(0.2)
    ser.reset_input_buffer()
    b = ser.readline()
    string = b.decode()
    
    caller = int(string)

print("User detected! Initiating sequence...")

if caller == 1:
    response, code = recognize() 
    if code is 1: 
        # Door lock actuated
        ser.write(b'1')
        
Text2Speech(response)
print(response)
    
