# Realtime model testing - webcam

#### Parameters

In [1]:
image_size = (64,64)

#### Dependencies

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install retina-face

In [4]:
from IPython.display import display, Javascript, Image
from google.colab.output import eval_js
from base64 import b64decode, b64encode
import cv2
import numpy as np
import PIL
import io
import html
import time
from matplotlib import pyplot as plt
from retinaface import RetinaFace

#### Javascript functionalities

In [5]:
def js_to_image(js_reply):
  image_bytes = b64decode(js_reply.split(',')[1])
  jpg_as_np = np.frombuffer(image_bytes, dtype=np.uint8)
  img = cv2.imdecode(jpg_as_np, flags=1)
  return img

def bbox_to_bytes(bbox_array):
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  iobuf = io.BytesIO()
  bbox_PIL.save(iobuf, format='png')
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))
  return bbox_bytes

In [6]:
face_cascade = cv2.CascadeClassifier(cv2.samples.findFile(
                            cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'))

In [7]:
def take_photo(filename='photo.jpg', quality=0.8):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);
      await new Promise((resolve) => capture.onclick = resolve);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)

  data = eval_js('takePhoto({})'.format(quality))
  img = js_to_image(data)
  print(img)
  gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
  print(gray.shape)
  faces = face_cascade.detectMultiScale(gray)
  for (x,y,w,h) in faces:
      img = cv2.rectangle(img,(x,y),(x+w,y+h),(255,0,0),2)
  cv2.imwrite(filename, img)

  return filename

In [18]:
try:
  filename = take_photo('photo.jpg')
  print('Saved to {}'.format(filename))
  display(Image(filename))
except Exception as err:
  print(str(err))

<IPython.core.display.Javascript object>

CustomError: Timed out waiting for iframe configuration. URL: https://colab.research.google.com/drive/1269vR1wQx0W_j0e6njVHRT6kTLFXdtq_#scrollTo=ilLkpcKanPRb


In [8]:
def video_stream():
  js = Javascript('''
    var video;
    var div = null;
    var stream;
    var captureCanvas;
    var imgElement;
    var labelElement;

    var pendingResolve = null;
    var shutdown = false;

    function removeDom() {
       stream.getVideoTracks()[0].stop();
       video.remove();
       div.remove();
       video = null;
       div = null;
       stream = null;
       imgElement = null;
       captureCanvas = null;
       labelElement = null;
    }

    function onAnimationFrame() {
      if (!shutdown) {
        window.requestAnimationFrame(onAnimationFrame);
      }
      if (pendingResolve) {
        var result = "";
        if (!shutdown) {
          captureCanvas.getContext('2d').drawImage(video, 0, 0, 640, 480);
          result = captureCanvas.toDataURL('image/jpeg', 0.8)
        }
        var lp = pendingResolve;
        pendingResolve = null;
        lp(result);
      }
    }

    async function createDom() {
      if (div !== null) {
        return stream;
      }

      div = document.createElement('div');
      div.style.border = '2px solid black';
      div.style.padding = '3px';
      div.style.width = '100%';
      div.style.maxWidth = '600px';
      document.body.appendChild(div);

      const modelOut = document.createElement('div');
      modelOut.innerHTML = "<span>Status:</span>";
      labelElement = document.createElement('span');
      labelElement.innerText = 'No data';
      labelElement.style.fontWeight = 'bold';
      modelOut.appendChild(labelElement);
      div.appendChild(modelOut);

      video = document.createElement('video');
      video.style.display = 'block';
      video.width = div.clientWidth - 6;
      video.setAttribute('playsinline', '');
      video.onclick = () => { shutdown = true; };
      stream = await navigator.mediaDevices.getUserMedia(
          {video: { facingMode: "environment"}});
      div.appendChild(video);

      imgElement = document.createElement('img');
      imgElement.style.position = 'absolute';
      imgElement.style.zIndex = 1;
      imgElement.onclick = () => { shutdown = true; };
      div.appendChild(imgElement);

      const instruction = document.createElement('div');
      instruction.innerHTML =
          '<span style="color: red; font-weight: bold;">' +
          'When finished, click here or on the video to stop this demo</span>';
      div.appendChild(instruction);
      instruction.onclick = () => { shutdown = true; };

      video.srcObject = stream;
      await video.play();

      captureCanvas = document.createElement('canvas');
      captureCanvas.width = 640; //video.videoWidth;
      captureCanvas.height = 480; //video.videoHeight;
      window.requestAnimationFrame(onAnimationFrame);

      return stream;
    }
    async function stream_frame(label, imgData) {
      if (shutdown) {
        removeDom();
        shutdown = false;
        return '';
      }

      var preCreate = Date.now();
      stream = await createDom();

      var preShow = Date.now();
      if (label != "") {
        labelElement.innerHTML = label;
      }

      if (imgData != "") {
        var videoRect = video.getClientRects()[0];
        imgElement.style.top = videoRect.top + "px";
        imgElement.style.left = videoRect.left + "px";
        imgElement.style.width = videoRect.width + "px";
        imgElement.style.height = videoRect.height + "px";
        imgElement.src = imgData;
      }

      var preCapture = Date.now();
      var result = await new Promise(function(resolve, reject) {
        pendingResolve = resolve;
      });
      shutdown = false;

      return {'create': preShow - preCreate,
              'show': preCapture - preShow,
              'capture': Date.now() - preCapture,
              'img': result};
    }
    ''')

  display(js)

def video_frame(label, bbox):
  data = eval_js('stream_frame("{}", "{}")'.format(label, bbox))
  return data

In [9]:
video_stream()

<IPython.core.display.Javascript object>

### Model

#### Data import

In [10]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'lfw-augmented:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F4885396%2F8236561%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240427%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240427T072302Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D8bf77f5823e09b4d4260f17cf34d3d1f53159476a699e8be82b64924e8d380e8e8b1f1bd3387950a8c1be809d258dbe9f4f72aa5fb76ebc2f1b9cda12be6efc1ff7d421f9d6d5781ef09c2422840922d73dba0818e452bd51e8a02777e73efbfce432797ace7a8e18bff9e1d3fe79467af39d2b265739b3f1f950707b2d1959fb027129d2908c50506a9fcef0a6bfa6ac6d8541f2c53b996e279818821d6ad8da5715bd48a5f0267422b464dce48910aa7b702632e8951f9fe3bc47045dbcb5bd5d4109dcfdda3b78137b0e7a478a81db0eac5a2d2419fa7b6213a56ca2bbaf0076900aeaa41847aad626b9b9dcabdea3446be5a774e6a07499a4a28c5fdce83,custom-dataset:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F4887194%2F8238933%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240427%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240427T072302Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D4c07542eef7b5e78317deeed74400784e599172887141ca31abf02b7bc3362db77ec7c26ebd436747248ebf6b36b344c012fcaed6b17b1a79a60aa566ec3b66006fee5e463f84736a0d8b4f613109e9ee29c84ad01923981f10134646821ed44db25208a7968ebaf05315e233eb9b4885214e94e5d31cc1943dc2a77d2b5c8e28f00dc89ef32c30805d1fe3a4bd1e63e4cfd7bb829f9589348f862880bf94ad25b3cdaa4ec44b5c25da19aab241142a7d9606da5d0a08b89814627b5c0833f3546eb1e87c1c77b4d4a7ef6ec09226235e7cc68bec6c26185cf01c41cc589529383c3432d6b54eca5300ca1afec1813459d97f9ace0f1312548f2985be0815145'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


Downloading lfw-augmented, 54492206 bytes compressed
Downloaded and uncompressed: lfw-augmented
Downloading custom-dataset, 60719041 bytes compressed
Downloaded and uncompressed: custom-dataset
Data source import complete.


In [11]:
from PIL import Image, ImageEnhance
import numpy as np
import pandas as pd
from PIL import ImageFilter
from matplotlib import pyplot as plt
import colorsys
import cv2
import os
from skimage.filters import gabor, gaussian
from IPython.display import display
from matplotlib.pyplot import imshow
from pywt import dwt2
import pickle
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.cluster import KMeans
import random

%matplotlib inline

In [12]:
import warnings
warnings.filterwarnings('ignore')

In [13]:
rootLFW="/kaggle/input/lfw-augmented/Augmented_LFW_final/Augmented_LFW_Final/"
rootCD ="/kaggle/input/custom-dataset/Custom_Dataset/"

#### model training

In [14]:
def create_gabor_filter_bank(ksize, sigma, theta, lambd, gamma):
    filters = []
    for t in theta:
        kernel = cv2.getGaborKernel((ksize, ksize), sigma,
                                    np.deg2rad(t), lambd, gamma, 0,
                                    ktype=cv2.CV_32F)
        filters.append(kernel)
    return filters

ksize = 9                       # Size of the filter
sigma = 2.0                     # Standard deviation of the Gaussian envelope
theta = [0,30,60,90,120,150]    # Orientation of the Gabor filter
lambd = 5.0                     # Wavelength of the sinusoidal factor
gamma = 0.5                     # Spatial aspect ratio

filter_bank = create_gabor_filter_bank(ksize, sigma, theta, lambd, gamma)

def apply_gabor_filters(image, filters):
    responses = []
    for kernel in filters:
        filtered_image = cv2.filter2D(image, cv2.CV_8UC3, kernel)
        responses.append(filtered_image)
    return responses

def extract_features(image, filter_bank):
    responses = apply_gabor_filters(image, filter_bank)
    features = np.concatenate(responses, axis=1)
    return features.flatten()

X = []
Y = []
images = []
for person in os.listdir(rootCD):
    for file in os.listdir(rootCD+person):
        path = rootCD + person + '/' + file
        im = cv2.resize(cv2.imread(path,cv2.IMREAD_GRAYSCALE),image_size)
        X.append(extract_features(im,filter_bank))
        images.append(im)
        Y.append(person.replace('_',' '))
X = np.array(X)
Y = np.array(Y)

n = 2048
scaler = StandardScaler()
standardized_data = scaler.fit_transform(X)
pca = PCA(n_components=n)
pca.fit(standardized_data)
X = pca.transform(standardized_data)

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3,
                                                          random_state=42)

k = 5                                                 # Number of neighbors
knn_classifier = KNeighborsClassifier(n_neighbors=k)
knn_classifier.fit(X_train, Y_train)

#### Final pipeline

---
input : 250x250 rgb image; output : person name


In [15]:
def predict(im):
    im = cv2.cvtColor(cv2.resize(im,image_size),cv2.COLOR_BGR2GRAY)
    X = scaler.transform([extract_features(im,filter_bank)])
    X = pca.transform(X)
    return knn_classifier.predict(X)[0]

### webcam testing

In [16]:
video_stream()
label_html = 'Capturing...'
bbox = ''
count = 0
while True:
    js_reply = video_frame(label_html, bbox)
    if not js_reply:
        break

    img = js_to_image(js_reply["img"])

    face = RetinaFace.extract_faces(img,align=True)
    if len(face) == 0:
        print('No face')
    else:
        face = face[0]
        image_rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
        image_rgb = cv2.resize(image_rgb, (250,250), interpolation = cv2.INTER_LINEAR)
        print(predict(image_rgb))

<IPython.core.display.Javascript object>

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-16-9817d2c7c194>", line 6, in <cell line: 5>
    js_reply = video_frame(label_html, bbox)
  File "<ipython-input-8-0d03334cd2c1>", line 133, in video_frame
    data = eval_js('stream_frame("{}", "{}")'.format(label, bbox))
  File "/usr/local/lib/python3.10/dist-packages/google/colab/output/_js.py", line 40, in eval_js
    return _message.read_reply_from_input(request_id, timeout_sec)
  File "/usr/local/lib/python3.10/dist-packages/google/colab/_message.py", line 96, in read_reply_from_input
    time.sleep(0.025)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py", line 2099, in showtraceback
    stb = value._render_traceb

TypeError: object of type 'NoneType' has no len()