<a href="https://colab.research.google.com/github/ethan-pritchard/face-recognition-labeling/blob/main/Face_Detection_and_Labeling_using_a_PostgreSQL_Database.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Face Detection and Labeling using a PostgreSQL Database

This notebook utilizes the [facial recognition](https://github.com/ageitgey/face_recognition) library to recognize faces, encode the faces, and compare the encoded faces to known faces to find the closest match. The library itself handles the process of detecting faces and encoding them.

Facial recognition is an emerging technology and its integration into everyday technology is exponentially increasing. In an effort to educate myself on the subject, I propose the `FaceDetectionController` and an adjacent PostgreSQL database as a solution to many problems facial recognition aims to tackle. These primary three problems facial recognition aims to tackle are:

1) Assigning a unique identifier to known people and associating metadata to that unique identifier,

2) Storing faces long-term and associating faces with people using the person's unique identifier, and

3) Being able to explictly decide who the runtime can recognize by adding and removing known people from the runtime.

The `FaceDetectionController` is a solution for all three of these problems. It provides methods to find known people and get their name. It provides methods to save faces long-term with an attached face. It facilitates an artificial runtime by storing a collection of loaded users and their respective faces so when the `runtime_predict` method is called, the [facial recognition](https://github.com/ageitgey/face_recognition) library can easily find the best match (if there is a match to find).

### The PostgreSQL Database

This notebook introduces a database schema comprised of two tables: a users table and a faces table. The users table will store users as unique identifiers (UUIDs) and attach data to their identifier, such as name or workplace. The faces table will store individual face vectors and associate them with a unique user via their unique identifier. This is a **one-to-many** relationship between users and faces (**One** user has **many** faces). 

Here's a diagram of this schema:

![](https://i.imgur.com/Oc4XL1g.png)

The primary benefits of this schema is easy integration into existing user databases. In the case of an existing schema already in place for user management, the **facedetect.users** table becomes a combination table facilitating a **one-to-one** relationship between the existing user identifier and the new unique identifier used exclusively for face detection.

Here's a diagram showing an example of how this schema would be integrated into an existing user management schema:

![](https://i.imgur.com/A9KVx9x.png)

The schema discussed above is laid out in my `schema.sql` file:

```sql
-- UUID extension
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

-- Face detection schema
CREATE SCHEMA IF NOT EXISTS facedetect;

-- Table for storing users and their user data
CREATE TABLE IF NOT EXISTS facedetect.users (
	id   uuid    DEFAULT uuid_generate_v4(),
	name VARCHAR NOT NULL,
	PRIMARY KEY (id)
);

-- Table for storing face vectors and binding those faces to known users
CREATE TABLE IF NOT EXISTS facedetect.faces (
	face_id uuid                   DEFAULT uuid_generate_v4(),
	user_id uuid                   NOT NULL,
	vector  double precision ARRAY NOT NULL,
	PRIMARY KEY (face_id),
	CONSTRAINT check_user FOREIGN KEY(user_id) REFERENCES users(id)
);
```

### The FaceDetectionController

In [24]:
!pip install https://github.com/ageitgey/face_recognition/archive/v1.2.2.tar.gz
!pip install opencv-python
!pip install psycopg2

Collecting https://github.com/ageitgey/face_recognition/archive/v1.2.2.tar.gz
  Using cached https://github.com/ageitgey/face_recognition/archive/v1.2.2.tar.gz
Building wheels for collected packages: face-recognition
  Building wheel for face-recognition (setup.py) ... [?25l[?25hdone
  Created wheel for face-recognition: filename=face_recognition-1.2.2-py2.py3-none-any.whl size=15246 sha256=34a224c77bb061e1a49b95cff3783fcb992c68d9b1a0c2c2da431258bd930c44
  Stored in directory: /tmp/pip-ephem-wheel-cache-ig_vvl_y/wheels/b1/ad/50/70c4119897208fd1bd524711e9fff3400b1621a769a42fe34f
Successfully built face-recognition


In [None]:
import face_recognition
import numpy as np
import psycopg2

In [None]:
class FaceDetectionUser:
  # For now, users only have their unique id and 
  def __init__(self, id, name):
    self.id = id
    self.name = name

In [20]:
class FaceDetectionController:
  # When a controller is created, we need to connect it to the database and run any initialization routines
  def __init__(self, db={}, schema_file=None):
    # Check DB settings
    if not 'database' in db: raise Exception('No database specified (key=database).')
    if not 'user' in db: raise Exception('No username specified (key=user).')
    if not 'password' in db: raise Exception('No password specified (key=password).')
    if not 'host' in db: raise Exception('No host address specified (key=host).')
    if not 'port' in db: raise Exception('No port specified (key=port).')

    # Create DB connection
    self.db_conn = psycopg2.connect(database=db['database'], user=db['user'], password=db['password'], host=db['host'], port=db['port'])
    self.db_instance = self.db_conn.cursor()

    # Create DB schema if specified
    if schema_file is not None:
      self.db_instance.execute(open(schema_file, 'r').read())
      self.db_conn.commit()
    
    # Initialize our controller's runtime
    self.runtime_init()

  # Function to disconnect the controller once done
  def disconnect(self):
    self.db_instance.close()

  #
  # Runtime function
  #

  # Initializes runtime objects that allow us to load specific users and recognize their faces
  def runtime_init(self):
    self.runtime = {}
  
  # Checks our runtime for the best match
  def runtime_predict(self, target):
    # By default, our prediction is that we do not know whose face it is
    label = None

    # Check every loaded user's faces and find the closest match
    for user in self.runtime:
      # Get the user's faces and try and find matches
      faces = self.runtime[user]
      matches = face_recognition.compare_faces(faces, target)
      if len(matches) <= 0: continue # If no matches found, we should leave

      # Get the min distance and check if we should update our label
      dist = np.min(face_recognition.face_distance(matches, target))
      if label is None or dist < label[1]: label = (user, dist)

    # Return the label
    return label
  
  # Add a user to the runtime
  def runtime_add_user(self, user, override=False):
    # Verify the user
    if not isinstance(user, FaceDetectionUser): raise Exception("Parameter is not a user object.")
    if user.id is None: raise Exception("User has no unique identifier.")

    # Check if the user is currently in the runtime
    if user in self.runtime and not override: return
    self.runtime[user] = self.get_faces(user)

  #
  # User DB functions
  # 

  # Add a new user to the database
  def create_user(self, name):
    name = name.lower()
    self.db_instance.execute("INSERT INTO facedetect.users (name) VALUES (%s) RETURNING id", (name,))
    id = self.db_instance.fetchone()[0]
    return FaceDetectionUser(id=id, name=name)
  
  # Remove a user from the database
  def remove_user(self, id):
    self.db_instance.execute("DELETE FROM facedetect.users WHERE id=%s", (id,))
    self.db_conn.commit()
  
  # Get a user using their unique identifier
  def get_user(self, id):
    # Select user by unique id
    self.db_instance.execute("SELECT (id, name) FROM facedetect.users WHERE id=%s", (id,))
    row = self.db_instance.fetchone()[0]
    
    if row is not None: return FaceDetectionUser(id=row[0], name=row[1])
    return row
  
  # Get all users who have a specific name
  # -> This blurb is useful for adding in more data for users, such as only getting users by location
  def get_users_by_name(self, name):
    # Select user by name
    name = name.lower()
    self.db_instance.execute("SELECT (id, name) FROM facedetect.users WHERE name=%s", (name,))
    rows = self.db_instance.fetchall()
    users = []

    # Create runtime objects for the users
    for row in rows:
      users.append(FaceDetectionUser(id=row[0], name=row[1]))
    
    # Return all users with this name
    return users
  
  #
  # Face DB functions
  # 
  
  # A simple helper method to load and encode the ith face in a file (Typically files only have 1 face)
  def image_to_vector(self, file, i=0):
    return face_recognition.face_encodings(face_recognition.load_image_file(file))[i]

  # Add a face vector to the database
  def create_face(self, user, face):
    # Verify the user
    if not isinstance(user, FaceDetectionUser): raise Exception("Parameter is not a user object.")
    if user.id is None: raise Exception("User has no unique identifier.")

    # Check if face exists
    face = list(face)
    self.db_instance.execute("SELECT (face_id) FROM facedetect.faces WHERE vector=%s::double precision[]", (face,))
    row = self.db_instance.fetchone()
    if row is not None: return row[0][0]

    # Add face to the database
    self.db_instance.execute("INSERT INTO facedetect.faces (user_id, vector) VALUES (%s, %s::double precision[]) RETURNING (face_id)", (user.id, face,))
    return self.db_instance.fetchone()[0][0]
  
  # Remove a face vector from the database
  def remove_face(self, face_id):
    self.db_instance.execute("DELETE FROM facedetect.faces WHERE face_id=%s", (face_id,))
    self.db_conn.commit()
  
  # Get a face using a user's unique identifier
  def get_faces(self, user):
    # Verify the user
    if not isinstance(user, FaceDetectionUser): raise Exception("Parameter is not a user object.")
    if user.id is None: raise Exception("User has no unique identifier.")

    # Search for faces using the user's unique id
    self.db_instance.execute("SELECT (vector) FROM facedetect.faces WHERE user_id=%s", (user.id,))
    return self.db_instance.fetchall()
  
  # Get a user using a face vector
  def get_user_by_face(self, face):
    # Search for faces with this vector in hopes to find the attached user id
    face = list(face)
    self.db_instance.execute("SELECT (user_id) FROM facedetect.faces WHERE vector=%s::double precision[]", (face,))
    row = self.db_instance.fetchone()

    # If the face is known, get the user
    if row is not None: return self.get_user(row[0][0])
    return None # User was not known

### OpenCV Webcam Demo

This demo will not work on Google Colab or any cloud provider! You will have to run this on your local machine.

Below is an example of how to feed an OpenCV webcam stream into the `FaceDetectController` as well as an example of how to setup the controller, add a new face and user, and then load that user into the runtime. The `webcam_detect` method will loop forever continously reading the OpenCV webcam feed and inputting it into the `FaceDetectController.runtime_predict` method. If no matches are found, the face is labeled "NOT A USER".

In [None]:
import cv2

In [21]:
# Method to continuously scan my webcam and detect users as they are found
def webcam_detect(controller=None, skip=2):
  # Create webcam feed
  video_capture = cv2.VideoCapture(0)

  # Continuously loop over the webcam feed and try and label faces
  fs = 0 # Frames skipped counter
  while True:
    # Get the RGB frame
    ret, frame = video_capture.read()
    small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
    rgb_small_frame = small_frame[:,:,::-1]

    # If we haven't reached skip count yet, we should skip this frame
    if fs < skip:
        fs += 1
        continue
    # We've skipped enough frames. Process this frame and reset counter
    else: fs = 0

    # Detect faces
    face_locations = face_recognition.face_locations(rgb_small_frame)
    face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)

    # Get names for faces
    names = []
    for v in face_encodings:
      # By default, we don't know nothin'
      label = 'NOT A USER'

      # If we don't have a controller, there's no name
      if controller is not None and isinstance(controller, FaceDetectionController):
        # Use the controller to make a prediction
        pred = controller.runtime_predict(v)
        if pred is not None: label = pred[0].name
      
      # Add the name
      names.append(label)

    # Label the detected faces with their name
    for (top, right, bottom, left), name in zip(face_locations, names):
      # Scale the images back to full
      top *= 4
      right *= 4
      bottom *= 4
      left *= 4

      cv2.rectangle(frame, (left, top), (right, bottom), (0,0,255), 2)
      cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0,0,255), cv2.FILLED)
      cv2.putText(frame, name, (left + 6, bottom - 6), cv2.FONT_HERSHEY_DUPLEX, 1.0, (255,255,255), 1)
    
    # Show the resulting image
    cv2.imshow('Video', frame)

    # Exit condition
    if cv2.waitKey(1) & 0xFF == ord('q'): break

  # Destroy the instances
  video_capture.release()
  cv2.destroyAllWindows()

In [None]:
# Controller instance
c = FaceDetectionController(db={'database': '', 'user': '', 'password': '', 'host': '', 'port': '5432'}, schema_file='schema.sql')

In [None]:
# Add my face to the controller
face_ethan = c.image_to_vector('ethan.png')
user_ethan = c.get_user_by_face(face_ethan)

# If the face is not known, let's add it
if user_ethan is None:
  user_ethan = c.create_user('Ethan')
  face_id = c.create_face(user_ethan, face_ethan)

In [None]:
# Add the user to the runtime
c.runtime_add_user(user_ethan)

In [None]:
# Run the detect webcam (You need an OpenCV capable machine to run this. Google Colab will not work!)
webcam_detect(controller=c)