# Initial

## Imports:

In [None]:
# Standard Library Imports
import os
import pickle
import shutil
import time
import asyncio

In [None]:
!pip install fastapi

In [None]:
# Third-party Library Imports
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import cv2
import numpy as np
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse, RedirectResponse
from multiprocessing import Pool
from tqdm import tqdm

In [None]:
pip install deepface

In [None]:
# Deep Learning Model Imports
from deepface import DeepFace as dpf
from deepface.commons import functions

In [None]:
pip install retinaface

In [None]:
# Computer Vision Model Imports
from retinaface import RetinaFace

In [None]:
pip install uvicorn

In [None]:
import uvicorn

In [None]:
# Visualization
import matplotlib.pyplot as plt

In [None]:
# import dependencies
from IPython.display import display, Javascript, Image
from google.colab.output import eval_js
from base64 import b64decode, b64encode
import cv2
import numpy as np
import PIL
import io
import html
import time

## Helper Functions
Below are a few helper function to make converting between different image data types and formats.

In [None]:
# function to convert the JavaScript object into an OpenCV image
def js_to_image(js_reply):
  """
  Params:
          js_reply: JavaScript object containing image from webcam
  Returns:
          img: OpenCV BGR image
  """
  # decode base64 image
  image_bytes = b64decode(js_reply.split(',')[1])
  # convert bytes to numpy array
  jpg_as_np = np.frombuffer(image_bytes, dtype=np.uint8)
  # decode numpy array into OpenCV BGR image
  img = cv2.imdecode(jpg_as_np, flags=1)

  return img

# function to convert OpenCV Rectangle bounding box image into base64 byte string to be overlayed on video stream
def bbox_to_bytes(bbox_array):
  """
  Params:
          bbox_array: Numpy array (pixels) containing rectangle to overlay on video stream.
  Returns:
        bytes: Base64 image byte string
  """
  # convert array into PIL image
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  iobuf = io.BytesIO()
  # format bbox into png for return
  bbox_PIL.save(iobuf, format='png')
  # format return string
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))

  return bbox_bytes

## Haar Cascade Classifier
For this tutorial we will run a simple object detection algorithm called Haar Cascade on our images and video fetched from our webcam. OpenCV has a pre-trained Haar Cascade face detection model.

In [None]:
# initialize the Haar Cascade face detection model
face_cascade = cv2.CascadeClassifier(cv2.samples.findFile(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'))

## API Object:

In [None]:
# Creating the fastapi object
app = FastAPI()

## Constants

In [None]:
# Constants
FACE_MAX_WORKERS = 14
PKL_MAX_WORKERS = 16
RECORDING_OUTPUT = "Frames"
FACE_OUTPUT = "Data"
FACE_THRESHOLD = 0.99
FPS = 30
CAMERA_INDEX = 0
DIRECTORY = os.getcwd()

## Functions

### Processing Image Function

In [None]:
def process_image(employee):
    model_name = "VGG-Face"
    enforce_detection = False
    detector_backend = "retinaface"
    align = True
    normalization = "base"

    target_size = functions.find_target_size(model_name=model_name)

    img_objs = functions.extract_faces(img=employee, target_size=target_size, detector_backend=detector_backend, grayscale=False, enforce_detection=enforce_detection, align=align)
    representations = []
    for img_content, _, _ in img_objs:
        embedding_obj = Deep.represent(img_path=img_content, model_name=model_name, enforce_detection=enforce_detection, detector_backend="skip", align=align, normalization=normalization)
        img_representation = embedding_obj[0]["embedding"]
        instance = [employee, img_representation]
        representations.append(instance)
    return representations

### Taking Photo Function

In [None]:
def take_photo(filename='photo.jpg', quality=0.8):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)

  # get photo data
  data = eval_js('takePhoto({})'.format(quality))
  # get OpenCV format image
  img = js_to_image(data)
  # save image
  cv2.imwrite(filename, img)

  return filename

In [None]:
class GoogleWebcam:
  def generate_filename(self, count):
        return f'Frame {count:02d}.jpg'

  def record_video(self,frames):
        frames_dir = os.path.join(DIRECTORY, RECORDING_OUTPUT)
        os.makedirs(frames_dir, exist_ok=True)
        count = -1

        while True:
            count += 1
            filename = os.path.join(frames_dir, self.generate_filename(count))

            take_photo(filename)

            print(filename)

            if count > frames:
                break

  def cam(self,filename):
    try:
      print('Saved to {}'.format(filename))

      # Show the image which was just taken.
      display(Image(filename))
    except Exception as err:
      print(str(err))

### Taking photos from webcam normally

In [None]:
class FaceExtractor:
    def process_frame(self, filename):
        try:
            count = os.path.split(filename)[1].split(' ')[1].split('.')[0]
            filename1 = os.path.join(os.path.join(DIRECTORY, RECORDING_OUTPUT), filename)

            faces = RetinaFace.extract_faces(filename1, threshold=FACE_THRESHOLD, model=None, allow_upscaling=True)

            if not faces:
                print(f"No faces extracted in Frame {count}.")
                return None

            path0 = os.path.join(DIRECTORY, FACE_OUTPUT)
            if not os.path.exists(path0):
                os.makedirs(path0)

            id = 1

            image_sizes = [np.prod(image.shape) for image in faces]
            largest_index = np.argmax(image_sizes)
            filename2 = f"user.{id}.{count}.jpg"
            result = os.path.join(path0, filename2)
            print(f"user.1.{count}.jpg written")
            cv2.imwrite(result, faces[largest_index])

            return count

        except Exception as e:
            print(str(e))
            return None

    async def video_to_faces_parallel(self):
        os.chdir(DIRECTORY)
        if os.path.exists(RECORDING_OUTPUT):
            files = os.listdir(os.path.join(DIRECTORY, RECORDING_OUTPUT))

            with concurrent.futures.ThreadPoolExecutor(max_workers=FACE_MAX_WORKERS) as executor:
                futures = [executor.submit(self.process_frame, filename) for filename in files]
                concurrent.futures.wait(futures)
        else:
            print("No photos folder found.")

###Cleaning Function

In [None]:
class Cleanup:
    @staticmethod
    def clean():
        directory_path = os.path.join(DIRECTORY, RECORDING_OUTPUT)
        try:
            shutil.rmtree(directory_path)
            print(f'Directory "{directory_path}" and its contents have been successfully deleted.')
        except Exception as e:
            print(f'An error occurred: {e}')

### Pkl FIle Making Function

In [None]:
class Deep:
    @staticmethod
    def represent(img_path, model_name="VGG-Face", enforce_detection=False, detector_backend="retinaface", align=True,
                  normalization="base"):
        resp_objs = []
        model = dpf.build_model(model_name)

        target_size = functions.find_target_size(model_name=model_name)
        if detector_backend != "skip":
            img_objs = functions.extract_faces(img=img_path, target_size=target_size, detector_backend=detector_backend,
                                               grayscale=False, enforce_detection=enforce_detection, align=align)
        else:
            if isinstance(img_path, str):
                img = functions.load_image(img_path)
            elif type(img_path).__module__ == np.__name__:
                img = img_path.copy()
            else:
                raise ValueError(f"unexpected type for img_path - {type(img_path)}")

            if len(img.shape) == 4:
                img = img[0]
            if len(img.shape) == 3:
                img = cv2.resize(img, target_size)
                img = np.expand_dims(img, axis=0)

            img_region = [0, 0, img.shape[1], img.shape[0]]
            img_objs = [(img, img_region, 0)]

        for img, region, _ in img_objs:
            img = functions.normalize_input(img=img, normalization=normalization)

            if "keras" in str(type(model)):
                embedding = model.predict(img, verbose=0)[0].tolist()
            else:
                embedding = model.predict(img)[0].tolist()

            resp_obj = {"embedding": embedding, "facial_area": region}
            resp_objs.append(resp_obj)

        return resp_objs

    def create_pkl_multiprocessing(self):

        tik = time.time()

        db_path = FACE_OUTPUT
        model_name = "VGG-Face"
        enforce_detection = False
        detector_backend = "retinaface"
        align = True
        normalization = "base"
        silent = False

        file_name = f"representations_{model_name}.pkl"
        file_name = file_name.replace("-", "_").lower()

        pkl = f"{db_path}/{file_name}"

        if os.path.exists(pkl):
            os.remove(pkl)

        employees = []

        for r, _, f in os.walk(db_path):
            for file in f:
                if ((".jpg" in file.lower()) or (".jpeg" in file.lower()) or (".png" in file.lower())):
                    exact_path = os.path.join(r, file)
                    employees.append(exact_path)

        if len(employees) == 0:
            raise ValueError("There is no image in ", db_path, " folder! Validate .jpg or .png files exist in this path.")

        representations = []

        MAX_WORKERS = 6

        with Pool(MAX_WORKERS) as pool:
            for batch_start in range(0, len(employees), MAX_WORKERS):
                batch_employees = employees[batch_start:batch_start + MAX_WORKERS]
                representations_list = list(tqdm(pool.imap(process_image, batch_employees), total=len(batch_employees), desc="Finding representations", disable=silent))
                representations.extend([item for sublist in representations_list for item in sublist])

        with open(pkl, "wb") as f:
            pickle.dump(representations, f)

        if not silent:
            print(f"Representations stored in {db_path}/{file_name}. Please delete this file when you add new identities in your database.")

        tok = time.time()

        print("Multiprocessing: ",str(round(tok-tik)))

    def create_pkl_nonopti(self):

        tik = time.time()

        db_path = FACE_OUTPUT
        model_name = "VGG-Face"
        enforce_detection = False
        detector_backend = "retinaface"
        align = True
        normalization = "base"
        silent = False
        target_size = functions.find_target_size(model_name=model_name)

        file_name = f"representations_{model_name}.pkl"
        file_name = file_name.replace("-", "_").lower()

        pkl = f"{db_path}/{file_name}"


        if os.path.exists(pkl):
            os.remove(pkl)


        employees = []

        for r, _, f in os.walk(db_path):
            for file in f:
                if (
                    (".jpg" in file.lower())
                    or (".jpeg" in file.lower())
                    or (".png" in file.lower())
                ):
                    exact_path = r + "/" + file
                    employees.append(exact_path)

        if len(employees) == 0:
            raise ValueError(
                "There is no image in ",
                db_path,
                " folder! Validate .jpg or .png files exist in this path.",
            )

        # ------------------------
        # find representations for db images

        representations = []

        # for employee in employees:
        pbar = tqdm(
            range(0, len(employees)),
            desc="Finding representations",
            disable=silent,
        )
        for index in pbar:
            employee = employees[index]

            img_objs = functions.extract_faces(
                img=employee,
                target_size=target_size,
                detector_backend=detector_backend,
                grayscale=False,
                enforce_detection=enforce_detection,
                align=align,
            )

            for img_content, _, _ in img_objs:
                embedding_obj = self.represent(
                    img_path=img_content,
                    model_name=model_name,
                    enforce_detection=enforce_detection,
                    detector_backend="skip",
                    align=align,
                    normalization=normalization,
                )

                img_representation = embedding_obj[0]["embedding"]

                instance = []
                instance.append(employee)
                instance.append(img_representation)
                representations.append(instance)

        # -------------------------------

        with open(f"{db_path}/{file_name}", "wb") as f:
            pickle.dump(representations, f)

        if not silent:
            print(
                f"Representations stored in {db_path}/{file_name} file."
                + "Please delete this file when you add new identities in your database."
            )

        tok = time.time()

        print("Non Optimized: ",str(round(tok-tik)))

    def create_pkl_concurrent(self):

        tik = time.time()

        db_path = FACE_OUTPUT
        model_name = "VGG-Face"
        enforce_detection = False
        detector_backend = "retinaface"
        align = True
        normalization = "base"
        silent = False
        target_size = functions.find_target_size(model_name=model_name)

        file_name = f"representations_{model_name}.pkl"
        file_name = file_name.replace("-", "_").lower()

        print(file_name)


        pkl = f"{db_path}/{file_name}"

        if os.path.exists(pkl):
            os.remove(pkl)

        print(pkl)

        # Create a tqdm progress bar object
        progress_bar = tqdm(total=(len(os.listdir(os.path.join(DIRECTORY,FACE_OUTPUT)))), desc="Processing", unit=" items")

        employees = []

        for r, _, f in os.walk(db_path):
            for file in f:
                if (".jpg" in file.lower()) or (".jpeg" in file.lower()) or (".png" in file.lower()):
                    exact_path = r + "/" + file
                    employees.append(exact_path)

        if len(employees) == 0:
            raise ValueError( "There is no image in ", db_path, " folder! Validate .jpg or .png files exist in this path")

        representations = []

        global count
        count = 0

        def process_employee(employee):
            img_objs = functions.extract_faces( img=employee, target_size=target_size, detector_backend=detector_backend, grayscale=False, enforce_detection=enforce_detection, align=align, )

            for img_content, _, _ in img_objs:
                embedding_obj = self.represent( img_path=img_content, model_name=model_name, enforce_detection=enforce_detection, detector_backend="skip", align=align, normalization=normalization, )

                img_representation = embedding_obj[0]["embedding"]

                instance = []
                instance.append(employee)
                instance.append(img_representation)
                representations.append(instance)
                global count
                count += 1
                # print(count)
                progress_bar.update(1)


        # Create a ThreadPoolExecutor
        max_workers = PKL_MAX_WORKERS
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            list(executor.map(process_employee, employees))

        with open(f"{db_path}/{file_name}", "wb") as f:
            pickle.dump(representations, f)

        if not silent:
            print(
                f"Representations stored in {db_path}/{file_name} file."
                + "Please delete this file when you add new identities in your database."
            )

        # Close the progress bar
        progress_bar.close()

        tok = time.time()

        print("Concurrent Futures: ",str(round(tok-tik)))

# Main Play Area

In [None]:
webcam = GoogleWebcam()
webcam.record_video(frames=100)

In [None]:
face_extractor = FaceExtractor()
await face_extractor.video_to_faces_parallel()

In [None]:
# cle = Cleanup()
# cle.clean()

In [None]:
dp = Deep()
dp.create_pkl_concurrent()