In [None]:
# DEPENDENCIES
!pip install mtcnn
!pip install fastapi uvicorn
!pip install colabcode
#!pip install sklearn
#!pip install tensorflow
#!pip install numpy
#!pip install pandas
#!pip install tqdm

In [None]:
# Authorizing ngrok terminal

!ngrok authtoken #auth_token

Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


In [None]:
# IMPORTING LIBs
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import os
import random
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
from tqdm import tqdm_notebook as tqdm
tqdm().pandas()

from PIL import Image
import matplotlib.pyplot as plt

from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import Normalizer
from sklearn.svm import SVC
from keras.models import load_model

from mtcnn.mtcnn import MTCNN

from sklearn.base import BaseEstimator, TransformerMixin

from fastapi import FastAPI, File, UploadFile
import uvicorn,pickle
from colabcode import ColabCode

0it [00:00, ?it/s]

In [None]:
# Following processes happen
# IMG -> IDENTIFY FACES -> NP_ARRAY -> FACE_EMBEDDING -> NP_ARRAY

class load_prep_data(BaseEstimator, TransformerMixin):

  def __init__(self):
    print('---> init called')
    # Pre-trained model by Hiroki Taniai
    self.model = load_model('/content/drive/MyDrive/FaceNet_API/Models/facenet_keras.h5')
    print('---> Loaded model')
    
  # extract a single face from a given photograph
  # EXTRACT FACES
  def extract_face(self,filename, required_size=(160, 160)):
      image = Image.open(filename)
      image = image.convert('RGB')
      pixels = np.asarray(image)
      
      # detect faces using MTCNN
      detector = MTCNN()
      results = detector.detect_faces(pixels)
      
      # extract the bounding box from the first face
      try:
          x1, y1, width, height = results[0]['box']
      
          # bug fix
          x1, y1 = abs(x1), abs(y1)

          x2, y2 = x1 + width, y1 + height

          face = pixels[y1:y2, x1:x2]

          image = Image.fromarray(face)

          image = image.resize(required_size)
          face_array = np.asarray(image)

          return face_array
      except Exception as e:
          print(e)
          print(filename)

      # required_size= (160,160)
      image = image.resize(required_size)
      face_array = np.asarray(image)
      return face_array

  # Make a list of faces extracted from each image of each folder
  def load_faces(self,directory):
    faces = list()
    
    for file_name in tqdm(os.listdir(directory)):
        path = directory + file_name
        
        face = self.extract_face(path)
        
        faces.append(face)
    return faces

  # Get embedding of face array using pre-trained model by Hiroki Taniai
  def get_embedding(self,face_pixels):
    face_pixels = face_pixels.astype('float32')
    # standardize pixel values across channels (global) 
    mean, std = face_pixels.mean(), face_pixels.std() 
    face_pixels = (face_pixels - mean) / std
    
    samples = np.expand_dims(face_pixels, axis=0)
    
    yhat = self.model.predict(samples)
    #models = ["VGG-Face", "Facenet", "OpenFace", "DeepFace", "Dlib", "ArcFace"]
    #yhat = DeepFace.represent(face_pixels, model_name = models[1])
    
    return yhat[0]

  # Driver code to call above functions in one go
  # extract_face --> get_embedding --> reshaping the image
  def preprocess_img(self,file_path):
    inp = self.extract_face(file_path)  # EXTRACTS FACE FROM IMAGE
    inp_emb = self.get_embedding(inp) #MAKES EMBEDDING OF THE IMAGE USING PRE-TRAINED MODEL
    img = plt.imread(file_path)
    plt.axis('off')
    plt.imshow(img)
    return inp_emb.reshape(1,-1)

  def fit(self,X,y=None):
    print('---> fit called')
    return self

  ## Transforming data to face_embeddings and labels
  def transform(self,X,y=None):
    directory=X
    print('----> transform called')
    images, labels = list(), list()
    
    # Processes each image of each folder
    for folder in tqdm(os.listdir(directory)):
        
        print(folder)
        path = directory+'/'+folder+'/'
        if 'pdf' in path:
          continue

        if not os.path.isdir(path):
            continue

        # detect face from image
        faces = self.load_faces(path)
        if faces is None:
            continue

        print(f'EMPLOYEE : {folder}, Faces : {len(faces)}')

        # Append respective label of face to temporary list label:[List]
        label = [folder for _ in range(len(faces))]

        # Append faces:[List] and label:[List] array to images:[List], labels:[List]
        images.extend(faces)
        labels.extend(label)
        
    x_train,y_train= np.asarray(images), np.asarray(labels)

    # Loading pre-trained model
    print('loading model for extracting face embedding')

    # get face embedding for x_train
    trainX = list()

    # get embeddings of image
    for pixels in tqdm(x_train):
        if pixels is None:
            continue
        embedding = self.get_embedding(pixels)
        trainX.append(embedding)
    trainX = np.asarray(trainX)

    print('Train X :', trainX.shape)
    return trainX, y_train

In [None]:
# NORMALIZE X DATA & LABEL ENCODE Y DATA

class norm_enc(BaseEstimator, TransformerMixin):
  def __init__(self):
    self.norm = Normalizer(norm='l2')
    self.le = LabelEncoder()
    print('->->->-> Initialised normaliser and label encoder')

  def fit(self,X,y=None):
    print('---> fit called')
    return self

  def transform(self,X,y=None):
    print('---> transform called')
    trainX = self.norm.transform(X[0])
    print('->->-> transformed x data')
    trainY = self.le.fit_transform(X[1])
    print('->-> encodede y data')
    return trainX,trainY,self.le

In [None]:
## API CODE BELOW

In [None]:
app = FastAPI(
    title='FaceNet model trainer',
    description="<ul><li>Select the <b>'/predict'</b> method and click <b>'try it out'</b></li><li>Upload an image of a person</li><li>click on <b>'execute'</b>,scroll down to see the result in response body</li>"
    )



TRAIN_PATH = '/content/drive/MyDrive/FaceNet_API/ID_DATA/train/'
VAL_PATH = '/content/drive/MyDrive/FaceNet_API/ID_DATA/test/'

accuracy = None
label_train = None

@app.get('/')
def index():
  return{
      'message': " Add '/train_test_save' to the end of link and press enter to start training the model.",
      'NOTE': 'Once the model training is started after executing above code no result will be shown since the training takes time. Before the training is done ERR_NGROK_3004 will be thrown which is nothing to worry about.',
      'Check results': "To check the results of model replace 'show_results' in place of '/train_test_save' and press enter"
  }

@app.get('/train_test_save')
def model():
  global accuracy
  global label_train

  print('EXECUTING DATA PIPELINE....')
  data_pipe = Pipeline(
      steps=[
            ('load',load_prep_data()),
            ('normalise & encode',norm_enc())
      ]
  )
  print('DATA PIPELINE INITIALISED')

  # TRAIN, VAL DATA USING PIPELINE

  x_train,y_train,label_train = data_pipe.fit_transform(TRAIN_PATH)
  print('--> TRAIN_DATA PIPED')
  x_val,y_val,label_test = data_pipe.fit_transform(VAL_PATH)
  print('--> TEST_DATA PIPED')

  print('INTIALISING MODEL PIPELINE....')
  model_pipe = Pipeline(
      steps=[
            ('SVC model',SVC(kernel='linear', probability=True))
      ]
  )
  print('MODEL PIPELINE INITIALISED')
  print('-->TRAINING MODEL')
  model_pipe.fit(x_train,y_train)
  print('-->MODEL TRAINED')

  # MODEL ACCURACY

  accuracy = model_pipe.score(x_val,y_val)
  print('-->model_accuracy:',accuracy)

  print('SAVING MODEL, LABELS.....')
  pickle.dump(model_pipe, open('/content/drive/MyDrive/FaceNet_API/Models/model.pkl', 'wb'))
  print('-->Model saved')
  np.save('/content/drive/MyDrive/FaceNet_API/Models/classes.npy', label_train.classes_)
  print('-->Label saved')

  return {
      'message': 'Model in progress'
  }

  

@app.get('/show_results')
def show():
  global accuracy
  global label_train
  
  try:
    n = len(label_train.classes_)
  except:
    n = "TRAIN THE MODEL TO SEE RESULTS"
    

  return {
      'Accuracy': accuracy,
      'Models saved in folder': '*/FaceNet_API/Models/',
      'Message':"Model saved as 'model.pkl'",
      'Message':"Label saved as 'classes.npy",
      'Total classes/STATUS': n
  }

In [None]:

server = ColabCode(port=8000, code=False)
server.run_app(app=app)

Public URL: NgrokTunnel: "https://3e5e-35-194-85-212.ngrok.io" -> "http://localhost:8000"


INFO:     Started server process [59]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     2405:201:c008:226a:b954:f382:197:fb0:0 - "GET / HTTP/1.1" 200 OK
INFO:     2405:201:c008:226a:b954:f382:197:fb0:0 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     2405:201:c008:226a:b954:f382:197:fb0:0 - "GET /show_results HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [59]
