# Code Stack for Back-End / Webcam integration

## Contents

1. Forming a 'hypermodel' comprising our best models using ensemble methods

2. Integration with live video streaming using OpenCV


In [None]:
# Import needed modules

# Basic packages needed for data analysis, visualization and manipulation
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Mainly Tensorflow packages for data preprocessing
from PIL import Image
import tensorflow as tf
from tensorflow.keras.preprocessing.image import img_to_array, load_img, ImageDataGenerator
from tensorflow.keras.utils import to_categorical 

# Mainly Tensorflow.keras layers and pre-trained Convolutional Neural Network (CNN) models needed to do Transfer Learning
from tensorflow.keras.applications import VGG19
from tensorflow.keras.layers import Dense, Conv2D, MaxPool2D, Dropout, Flatten, BatchNormalization, Input, GlobalAveragePooling2D
from tensorflow.keras import Sequential, regularizers, Model

# Mainly Tensorflow modules that help to optimize and fine-tune the CNN models better
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam 
from sklearn.model_selection import KFold, train_test_split 
from math import ceil

# Mainly metrics to assess the CNN's performance
from sklearn.metrics import accuracy_score, confusion_matrix, recall_score, precision_score, f1_score

# Mainly packages for webcam integration
import re
import cv2
import imutils
from imutils.video import VideoStream
import time

## 1. Using ensemble methods on best models

We'll gather the best models and form a soft voting classifier 

#### 1.1 Function for model generation

Each model to be included in the 'Hypermodel' has its own constructor

In [None]:
# 1.1
# Returns the model and the output layer's activation function
# Different models by the team use different output formats

def model_1():
  "Returns __'s CNN model and its activation function as inferred by the crossentropy used ('Binary', 'Categorical')"

  model = <code for constructing model>
  # Model's code using Keras

  output_activation = <type of crossentropy used>
  
  return (model, output_activation)

#### 1.2 Class for 'Hypermodel'

The Class for the 'hypermodel' to be generated in the following function

Documentation for the `Hypermodel` class
___

### Hypermodel

`Hypermodel` is an ensemble model encompassing the best models the team have so far, using a soft, average voting classifier

```python
Hypermodel(
    name=None
)
```

#### Examples:

```python
model_1 = model_1() # Following the functions mentioned earlier, is a tuple
model_2 = model_2() # Following the functions mentioned earlier, is a tuple

ensemble_model = Hypermodel("test") # Instantiate `Hypermodel`

# Add models
ensemble_model.add_tuple(model_1) 
ensemble_model.add_tuple(model_2)

# Prepare Iterator
datagen = ImageDataGenerator(rescale=1./255)
X = datagen.flow_from_directory("<path>", target_size=(224, 224), batch_size=32, class_mode='binary')

# Predict
results = ensemble_model.predict(X)
```

#### Arguments: 

| Parameter | Description |
| :--- | :--- |
| __Name__ | Optional name for the Hypermodel |

#### Attributes:

| Attribute | Description |
| :--- | :--- |
| **collections** | A list containing all the models we want to include |
| **mo_collections** | A list containing match objects for the maintenance of Hypermodel's string representation |
| **model_types** | A list containing the output type of the models in the `collections` attribute |
| **name** | The name given to the Hypermodel | 
| **no_of_models** | The number of models included so far |
| **removal_match** | The Regular Expression object used for maintenance of Hypermodel's string representation |
| **str_rep** | An essential part in Hypermodel's string representation |

#### Methods: 

___

__*add*__

```python
add(
    *models
)
```

Adds model instances produced by model generation functions mentioned earlier to the `Hypermodel`

__Arguments:__

| Parameter | Description |
| :--- | :--- |
| __\*models__ | Any number of model instances |

___

**_add_tuple_**

```python
add_tuple(
    *args
)
```

Adds model instances and its corresponding model output type generated by the model generation functions mentioned earlier to the `Hypermodel`

__Arguments:__

| Parameter | Description |
| :--- | :--- |
| __\*args__ | Any number of tuples containing the `(<model instance>, <model output type>)` |

__Raises:__

| Error | Reason |
| :--- | :--- |
| __TypeError__ | If the individual elements of `*args` are not tuples |

___

**_drop_**

```python
drop(
    ind=None
)
```

Removes either the most recently added model instance and its corresponding model output type or the model instance and corresponding output type specified by the `ind` parameter as the index in the `collections` and `model_types` attributes

__Arguments:__

| Parameter | Description |
| :--- | :--- |
| __ind__ | If `ind` is `None`, the latest model instance and type is removed. If `ind` is a specified integer, the model instance and output type at the index position `ind` specifies would be removed |

__Raises:__

| Error | Reason |
| :--- | :--- |
| __OutOfBoundError__ | If the number specified by `ind` is out of range |

___

**_name_**

```python
name(
    name
)
```

Names or renames the `Hypermodel`

__Arguments:__

| Parameter | Description |
| :--- | :--- |
| __name__ | Name for the `Hypermodel`, which automatically be converted into a string if it isn't one |

___

**_predict_**

```python
predict(
    X,
    steps=None,
    verbose=0,
    print_out=False
)
```

Predicts the result of the dataset specified via `X` parameter in a similar way to Keras's Sequential object predict() method  
Gathers and averages out the predictions for all the models included

__Arguments:__

| Parameter | Description |
| :--- | :--- |
| __X__ | Either an Iterator yielding `numpy` arrays or a collection of `numpy` arrays which are the images to feed to the models |
| __steps__ | Only used when `X` is an iterator to indicate the number of batches to predict over and should be left as `None` if a collection is used in `X`. If `X` is an iterator but `steps` is left as `None`, an exhaustive number of batches would be used as derived from the iterator |
| __verbose__ | If set to `0`, it only returns the Boolean values of each prediction. For any other number, both the Boolean and real floating point values are included |
| **print_out** | If set to `True`, the results would be printed out, otherwise, it defaults to `False` and only returns but does not print the results to `stdout` |

__Raises:__ 

| Error | Reason |
| :--- | :--- |
| __ArgumentError__ | If `X` is a collection but `steps` is not `None` |
| __TypeError__ | If `X` is neither a collection (`np.array`, `list`, `tuple`) or iterator (`Iterator`, `DirectoryIterator`) |

__Returns:__

| Variable | Description |
| :--- | :--- |
| __results__ | A `np.array` of the predictions |


In [None]:
# 1.2
# Class for 'hypermodel'

class Hypermodel:
  "Hypermodel acts as an ensemble classifier for different CNN models"

  def __init__(self, name = None):
    self.collections = []
    self.mo_collections = [] # 'mo' stands for match object from the 're' library
    self.model_types = []
    if name is None: 
      self.name = name
    else:
      self.name = str(name)
    self.no_of_models = 0
    self.removal_match = re.compile('\n\n\t>>> Model.*[0-9]+.*<')
    self.str_rep = f"Hypermodel[{self.no_of_models}]: "
  
  def __str__(self): 
    if self.name is None: 
      return "|==>>> " + self.str_rep + "\n\n\t|^^^^^"
    else: 
      return "|==>>> " + self.name + " " + self.str_rep + "\n\n\t|^^^^^"
  
  def __repr__(self): 
    if self.name is None: 
      return "|==>>> " + self.str_rep + "\n\n\t|^^^^^"
    else: 
      return "|==>>> " + self.name + " " + self.str_rep + "\n\n\t|^^^^^"
  
  def name(self, name):
    "Names the Hypermodel"
    self.name = str(name)
  
  def _update(self, past_no):
    "Helper function to update string representation"
    self.str_rep = self.str_rep[:11] + str(self.no_of_models) + self.str_rep[11 + past_no:]
    # Adjusts the first part of the str representation that states the number of models included currently
    # 11 is because the number of char there are till '[' in "Hypermodel[<no_of_models>]..." is 11
    temp_iter = self.removal_match.finditer(self.str_rep)
    self.mo_collections = [] # The collection of match objects is recompiled every _update call
    for _ in range(self.no_of_models):
      self.mo_collections.append(temp_iter.__next__()) # The purpose of storing match objects is for the drop method

  def add(self, *models):
    "Adds model instances into Hypermodel"
    temp = len(str(self.no_of_models)) # Keep track of the number of char the old number of models had for _update()
    for model in models:
      self.collections.append(model)
      self.model_types.append(None) # Unknown model output type
      self.no_of_models += 1
      self.str_rep += "\n\n\t>>> Model (" + str(self.no_of_models) + ") <"
    
    self._update(temp) # Update str representation

  def add_tuple(self, *args): 
    "Adds model instances and output type into Hypermodel"
    temp = len(str(self.no_of_models)) # Keep track of the number of char the old number of models had for _update()
    for arg in args:
      try: 
        assert isinstance(arg, tuple)
      except:
        raise TypeError
      self.collections.append(arg[0])
      self.model_types.append(arg[1])
      self.no_of_models += 1
      self.str_rep += "\n\n\t>>> Model (" + str(self.no_of_models) + ") <== " + str(arg[1]) + " <"
    
    self._update(temp) # Update str representation
    
  def drop(self, ind=None):
    "Removes model instance from Hypermodel"
    temp = len(str(self.no_of_models)) # Keep track of the number of char the old number of models had for _update()
    if self.no_of_models != 0 and ind is None: # Remove latest model instance
      self.collections.pop()
      self.model_types.pop()
      self.no_of_models -= 1
      self.str_rep = self.str_rep[:self.mo_collections[-1].start()] + self.str_rep[self.mo_collections[-1].end():]
      # Edit str representation using the match objects collection
    elif self.no_of_models != 0 and ind < self.no_of_models and ind is not None: # Remove specified model instance
      self.collections.pop(ind)
      self.model_types.pop(ind) 
      self.no_of_models -= 1
      self.str_rep = self.str_rep[:self.mo_collections[ind-1].start()] + self.str_rep[self.mo_collections[ind-1].end():]
      # Edit str representation using the match objects collection
    else:
      raise OutOfBoundError
    
    self._update(temp) # Update str representation
  
  def _output_interface(self, result):
    """
    Helper function acting as an interface to simplify handling of prediction results by different models
    with different output types
    """
    if len(result[0]) == 1: # Binary crossentropy/Sigmoid function used
      if len(result) == 1: # One image only
        return result[0][0]
      else: # Batch procesing
        temp = []
        for i in result:
          temp.append(i[0])
        return temp
    elif len(result[0]) == 2: # Categorical crossentropy/Softmax function used
      choice = 0 # The output neuron position/one-hot encoding position for the target class
      if len(result) == 1: # One image only
        return result[0][choice]
      else: # Batch procesing
        temp = []
        for j in result:
          temp.append(j[choice])
        return temp

  def predict(self, X, steps=None, verbose=0, print_out=False): 
    "Predicts the result based on the image inputs using a soft-voting, averaging ensemble method"
    results = np.array([])
    if isinstance(X, np.ndarray) or isinstance(X, list) or isinstance(X, tuple): # For collections
      if steps is not None:
        raise ArgumentError # Steps should be None
      for x in X: # Iterate through every image
        temp_result = 0 
        temp_test = np.asarray([x])
        for model in self.collections: # Gather each model's vote
          temp_result += self._output_interface(model.predict(temp_test))
        temp_bool = (temp_result/self.no_of_models) > 0.5 # Average out the votes
        if bool(verbose): 
          results = np.concatenate((results, np.array([[format(temp_result/self.no_of_models, ".2f"), temp_bool]])), axis=0)
        else:
          results = np.concatenate((results, np.array(temp_bool)))
      if print_out:
        for result in results: 
          print(result)
      return results
    elif isinstance(X, Iterator) or isinstance(X, DirectoryIterator): # For Iterators
      results = np.array([])
      if steps is None:
        steps = ceil(X.n / X.batch_size) # Set number of steps
      for i in range(steps): 
        batch = X.__next__() # Current batch
        for j in range(len(self.collections)): # Collect all models vote on the current batch
          if j == 0:
            temp_result = self._output_interface(self.collections[j].predict(batch)) 
          else:
            temp_result += self._output_interface(self.collections[j].predict(batch)) 
        temp_bool = (temp_result/self.no_of_models) > 0.5 # Average out the votes
        if bool(verbose):
          interm = np.stack(((temp_result/self.no_of_models), temp_bool), axis=-1)
          results = np.concatenate((results, interm))
        else:
          results = np.concatenate((results, temp_bool))
      if print_out:
        for result in results: 
          print(result)
      return results
    else:
      raise TypeError

#### 1.3 Function to ensemble all the methods 

Ensembling of all the models, loading of the corresponding fine-tuned weights and returns an ensemble model
<br/><br/>
___

<br/>

*collect_model*

```python
collect_model(
    *args, 
    weights_path
    name='Hypermodel 1'
)
```

collect_model() helps to add models to the Hypermodel it returns with their weights loaded

__Arguments:__

| Parameter | Description |
| :--- | :--- |
| __\*args__ | Any number of tuples of the form `(<model instance>, <model output type>)` |
| **weights_path** | A list of paths to each model's weights in the same sequence that \*args is defined |
| __name__ | Name for the `Hypermodel` |

__Raises:__

| Error | Reason |
| :--- | :--- |
| __TypeError__ | `weights_path` is not a `list` or `*args` are not all `tuples` |

__Returns:__

| Variable | Description |
| :--- | :--- |
| __result__ | `Hypermodel` with all the models added to it |


In [None]:
# 1.3
# collect_model() function

def collect_model(*args, weights_path, name='Hypermodel 1'):
  "Collects all the models into one Hypermodel instance which is then returned"
  try:
    assert isinstance(weights_path, list) # weights_path is a list of strings
  except:
    raise TypeError
  
  result = Hypermodel(name) 
  
  counter = 0  
  for model, form in args:
    model.load_weights(weights_path[counter]) # Load weights of model to be added

    result.add_tuple((model, form)) # Adds the model
    
    counter += 1
  
  return result

#### Usage example

```python
# Weights preparation
model_1_weights = "<path to model 1 weights>"
model_2_weights = "<path to model 2 weights>"
weights_path = [model_1_weights, model_2_weights]

# Forming Hypermodel
hypermodel = collect_model(model_1(), model_2(), weights_path=weights_path, name="test")

# Prepare Iterator
datagen = ImageDataGenerator(rescale=1./255)
X = datagen.flow_from_directory("<path>", target_size=(224, 224), batch_size=32, class_mode='binary')

# Predict
results = hypermodel.predict(X)
```


## 2. Using live video feed by integrating with OpenCV 

Seems there're a few methods to approach the model deployment
*  For video detection, we can stream from a webcam, extract region of interest using OpenCV's deep neural network and then detect the presence of mask
*  For image detection, a similar approach can be used

Seems there're also a few methods to build a face detector with OpenCV
*  Using pre-trained models stored as XML docs on OpenCV's github
*  Using pre-trained models stored as caffemodel files by Berkley's Caffe Model Zoo

### 2.1 Function find_mask_in_video() 

Finds the faces and predicts mask wearing in the video  
Mainly a helper function for the code below in 2.2

__Arguments:__

| Parameter | Description |
| :--- | :--- |
| __frame__ | Current frame of interest |
| **find_face** | The model that detects faces and isolate them as regions of interest |
| **find_mask** | The model that detects masks in the regions of interest returned by the model in `find_face` parameter |

__Returns:__

| Variable | Description |
| :--- | :--- |
| (__locations__, __predictions__) | A tuple containing a list of locations of regions of interest and a list of their corresponding mask predictions by both models passed in as arguments within the current isolated frame |

In [None]:
def find_mask_in_video(frame, find_face, find_mask):
  """Finds the faces in the frame passed in as its argument, 
  predicts if a mask is worn for every face and returns the locations and predictions of each face"""

  (height, width) = frame.shape[:2] # height and width of the whole frame
  blob = cv2.dnn.blobFromImage(frame, ) # incomplete, find out more on blob

  find_face.setInput(blob)
  detections = find_face.forward() 

  faces = [] # pixel information of the face image extracted
  locations = [] # location of faces in reference to the frame
  predictions = [] # predictions by our mask detection model

  for i in range(detections.shape[2]):
    box = detections[0, 0, i, 3:7] * np.array([width, height, width, height]) 
    # detections.shape[0,0,i,3:7] gives four values that 
    # can be multiplied with the frame heights and widths to get the coordinates of the 'box' around the face detected
    start_X, start_Y, end_X, end_Y = box.astype("int")

    # ensures all the 'boxes' of faces detected are within frame
    start_X, start_Y = (max(0, start_X), max(0, start_Y))
    end_X, end_Y = (min(width - 1, end_X), min(height - 1, end_Y))

    # extracting the region of interest (ROI) from video frame
    face = frame[start_Y:end_Y, start_X:end_X] # extract ROI
    face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB) # convert color channel from BGR to RGB
    face = face.resize(face, (224, 224)) # resize to our target 224, 224
    face = img_to_array(face) 
    face = custom_preprocess_input(face) # incomplete, preprocessing function to customize, necessary to rescale

    faces.append(face)
    locations.append((start_X, start_Y, end_X, end_Y))
  
  if len(faces) > 0:
    faces = np.array(faces, dtype="float32")
    predictions = find_mask.predict(faces, batch_size=32) # returns a list of booleans / a list of tuples, uses the Hypermodel class
  
  return (locations, predictions)

### 2.2 Implement the function find_mask_in_video() to a live stream 

Implements mask detector in a webcam livestream

In [None]:
# 2.2
# Main set of codes that enables the live video mask detection

PROTOTXT_PATH = # incomplete, for the OpenCV face detector
WEIGHTS_PATH = # incomplete, for the OpenCV face detector

face_detector = cv2.dnn.readNet(PROTOTXT_PATH, WEIGHTS_PATH)

MASK_WEIGHTS_PATH = # incomplete, weights of our models

mask_detector = collect_model(model_1(), model_2(), ..., weights_path=MASK_WEIGHTS_PATH, name="Elon Mask") # incomplete, to fill in models

vs = VideoStream(src=0).start()
time.sleep(2.0)

while True:
  frame = vs.read() # reads a frame
  frame = imutils.resize(frame, width=400) # incomplete, decide on the frame size

  locations, predictions = find_mask_in_video(frame, face_detector, mask_detector) # get predictions

  for (box, pred) in zip(locations, predictions): # labelling the predictions
    (start_X, start_Y, end_X, end_Y) = box
    if isinstance(pred, tuple):
      mask = pred[1]
      prob = pred[0]
      if mask:
        label = f"Mask: {(prob * 100):.2f}%"
        color = (0, 255, 0) # videostream uses BGR channels apparently, green
      else:
        label = f"No mask: {(prob * 100):.2f}%"
        color = (0, 0, 255) # videostream uses BGR channels apparently, red
    else:
      if pred: 
        label = "Mask"
        color = (0, 255, 0) # videostream uses BGR channels apparently, green
      else:
        label = "No mask"
        color = (0, 0, 255) # videostream uses BGR channels apparently, red
    
    cv2.putText(frame, label, (start_X, start_Y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2) # label
    cv2.rectangle(frame, (start_X, start_Y), (end_X, end_Y), color, 2) # colored box to mark out face

  cv2.imshow("Frame", frame) # project back to live-feed frame
  key = cv2.waitkey(1) & 0xFF # allows for exiting of the program by pressing 'q'
  
  if key == ord("q"): # captures the 'q' key pressed
    break

cv2.destroyAllWindows() # close all the video feed windows opened by cv2
vs.stop() # stops the video stream