In [3]:
import cv2
import face_recognition
import os
import numpy as np
from sqlalchemy import create_engine, Column, Integer, String, Float, LargeBinary
from sqlalchemy.orm import sessionmaker, declarative_base
import mysql.connector

# Initiation of MySQL database

In [4]:
# Create the database if it doesn't exist
def create_database():
    conn = mysql.connector.connect(
        host="host.docker.internal",
        user="root",
        password=""
    )
    cursor = conn.cursor()
    cursor.execute("CREATE DATABASE IF NOT EXISTS face")
    conn.close()

# Create the database
create_database()

In [5]:
Base = declarative_base()

class FaceEmbeddings(Base):
    __tablename__ = 'face_embeddings'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(100), nullable=False)
    embedding = Column(LargeBinary, nullable=False) # Store embeddings as binary
    
database_url = 'mysql+mysqlconnector://root:@host.docker.internal/face'
engine = create_engine(database_url)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()

# Face Recognition

In [6]:
# musk
im = cv2.imread("./images/musk.jpg")
rgb_im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
im_encoding = face_recognition.face_encodings(rgb_im)[0]

# gates
im2 = cv2.imread("./images/gates.jpg")
rgb_im2 = cv2.cvtColor(im2, cv2.COLOR_BGR2RGB)
im_encoding2 = face_recognition.face_encodings(rgb_im2)[0]

result = face_recognition.compare_faces([im_encoding], im_encoding2)
print(result[0])

False


In [7]:
# Video class for face recognition
class VidFaceRec:
    def __init__(self):
        self.known_fe = []
        self.known_names = []
        self.resize_factor = 0.25  # Resize frames to 0.25 of the original size for faster encoding
        
    def load_encoding_images(self, image_dir):
        """Load and encode known faces from a directory of images."""
        im_names = os.listdir(image_dir)
        print(f"{len(im_names)} encoding images found.")
        
        for im_name in im_names:
            name = os.path.splitext(im_name)[0]  # Extract name from image file name
            existing_face = session.query(FaceEmbeddings).filter_by(name=name).first()
            if existing_face:
                continue
            im_path = os.path.join(image_dir, im_name)
            im = cv2.imread(im_path)
            rgb_im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
            
            # Get face encodings for the image (assuming one face per image)
            im_enc = face_recognition.face_encodings(rgb_im)
            if len(im_enc) > 0:  # Check if a face encoding is found
                embedding_binary = np.array(im_enc[0]).tobytes()
                new_face = FaceEmbeddings(name=name, embedding=embedding_binary)
                session.add(new_face)
                session.commit()
                print(f"Face embedding for {name} stored in the database.")
                
    def retrieve_known_faces(self):
        """Query the database for all stored face embeddings."""
        if not self.known_fe:
            # Query all stored face embeddings from the database
            faces = session.query(FaceEmbeddings).all()

            # Deserialize embeddings and store them in lists
            for face in faces:
                embedding = np.frombuffer(face.embedding, dtype=np.float64)
                self.known_fe.append(embedding)
                self.known_names.append(face.name)

            session.close()

    
    def detect_known_faces(self, frame):
        """Detect and recognize known faces in the current video frame."""
        # Resize the frame for faster processing
        small_frame = cv2.resize(frame, (0, 0), fx=self.resize_factor, fy=self.resize_factor)
        rgb_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
        
        # Detect face locations in the frame
        locs = face_recognition.face_locations(rgb_frame)
        
        # Encode the detected faces in the frame
        face_encodings = face_recognition.face_encodings(rgb_frame, locs)
        self.retrieve_known_faces()
        names = []  # List to store the names of recognized faces
        for encoding in face_encodings:
            # Compare detected face encodings with known face encodings
            matches = face_recognition.compare_faces(self.known_fe, encoding)
            name = "Unknown"  # Default name if no match is found
            
            # Compute the distance between the detected face encoding and known encodings
            face_distances = face_recognition.face_distance(self.known_fe, encoding)
            
            # Select the known face with the smallest distance (most likely match)
            best_match_index = np.argmin(face_distances)
            
            if matches[best_match_index]:
                name = self.known_names[best_match_index]  # Retrieve the corresponding name
            names.append(name)  # Append the name to the list
        
        # Scale face locations back to the original frame size
        face_locations = np.array(locs) / self.resize_factor
        return face_locations.astype(int), names  # Return face locations and names

In [9]:
# Video inference
vfrec = VidFaceRec()
vfrec.load_encoding_images('./images')
cap = cv2.VideoCapture('./videos/tucker_elon.mp4')
while True:
    ret, frame = cap.read()
    if not ret:
        break
    face_locations, face_names = vfrec.detect_known_faces(frame)
    for face_loc, name in zip(face_locations, face_names):
        y1, x2, y2, x1 = face_loc[0], face_loc[1], face_loc[2], face_loc[3]
        cv2.putText(frame, name, (x1, y1-10), cv2.FONT_HERSHEY_DUPLEX, 1, (0,200,0), 2)
        cv2.rectangle(frame, (x1,y1), (x2,y2), (0,0,200), 4)
        cv2.imshow("frame", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

4 encoding images found.
