# **Installation**

In [1]:
!pip install ultralytics -q
!pip install facenet-pytorch -q
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 -q
!nvidia-smi
!pip install --upgrade Pillow -q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m901.7/901.7 kB[0m [31m49.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m47.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m755.5/755.5 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m410.6/410.6 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.1/14.1 MB[0m [31m85.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.7/23.7 MB[0m [31m69.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m823.6/823.6 kB[0m [31m50.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

# **Clone facenet-pytorch**

In [2]:
!git clone https://github.com/timesler/facenet-pytorch.git

Cloning into 'facenet-pytorch'...
remote: Enumerating objects: 1338, done.[K
remote: Counting objects: 100% (289/289), done.[K
remote: Compressing objects: 100% (70/70), done.[K
remote: Total 1338 (delta 236), reused 219 (delta 219), pack-reused 1049 (from 4)[K
Receiving objects: 100% (1338/1338), 23.19 MiB | 12.79 MiB/s, done.
Resolving deltas: 100% (656/656), done.


# **Libraries**

In [3]:
from google.colab import files
from facenet_pytorch import InceptionResnetV1
from torchvision import transforms
import torch
from PIL import Image
from torchvision import transforms
import torch
import os
import cv2
from ultralytics import YOLO
import time
import numpy as np
import math
from google.colab.patches import cv2_imshow
from collections import deque, Counter
from collections import defaultdict

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:
# !git clone https://github.com/timesler/facenet-pytorch.git
# cd /content/facenet-pytorch
# !pwd

# **Initializing Face Detector and Recognizer**

InceptionResnetV1 model with vggface2 pretrained weights

In [4]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print("device: ", device)
face_detector = YOLO("/content/yolov8n-face-keypoints.pt")
face_detector.to(device)
face_recognizer = InceptionResnetV1(pretrained='vggface2').eval().to(device)

device:  cuda:0


  0%|          | 0.00/107M [00:00<?, ?B/s]

# Other options: MTCNN or casia-webface

In [None]:
# # # If required, create a face detection pipeline using MTCNN:
# # mtcnn = MTCNN(image_size=<image_size>, margin=<margin>)
# mtcnn = MTCNN(image_size="160", keep_all=True, thresholds=[0.1, 0.1, 0.1])

# # For a model pretrained on CASIA-Webface
# face_recognizer = InceptionResnetV1(pretrained='casia-webface').eval()

  0%|          | 0.00/107M [00:00<?, ?B/s]

# **Create database embedding tensor**

In [5]:
transform = transforms.Compose([
    transforms.Resize((160, 160)),  # Resize the image to a fixed size
    transforms.ToTensor(),           # Convert the image to a PyTorch tensor
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])  # Normalize the tensor
])

img_names_ls = [f for f in os.listdir("/content/database") if f.endswith('.jpg')]

img_embedding_list = []

for image_name in img_names_ls:
  image_path = os.path.join("/content/database/", image_name)
  img = cv2.imread(image_path)
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  img = Image.fromarray(img)
  # img = Image.open(image_path).convert('RGB')
  img_tensor = transform(img)
  img_tensor = img_tensor.to(device)
  # Add a batch dimension
  img_tensor = img_tensor.unsqueeze(0)  # Shape: (1, 3, 160, 160)
  img_embedding = face_recognizer(img_tensor)
  img_embedding_list.append((img_embedding/img_embedding.norm(dim=1)))


Data_Base = torch.cat(img_embedding_list, dim=0)

# **Face Crop Embedder**

In [9]:
def face_crop_embedder(cropped_face: np.ndarray) -> torch.Tensor:
  
  cropped_face_rgb = cv2.cvtColor(cropped_face, cv2.COLOR_BGR2RGB)
  cropped_face_pil = Image.fromarray(cropped_face_rgb)
  cropped_face_tensor = transform(cropped_face_pil)
  cropped_face_embedding = face_recognizer(cropped_face_tensor.unsqueeze(0).to(device))
  cropped_face_embedding = cropped_face_embedding/cropped_face_embedding.norm(dim=1)

  return cropped_face_embedding

In [None]:
## Face recognition pipeline

#**Face recognition pipeline**

## **Without Buffer**

In [None]:
# Open the camera or video file
cap = cv2.VideoCapture("/content/subway.mp4")

# Initialize other variables
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
# cam_fps = cap.get(cv2.CAP_PROP_FPS)
cam_fps = 15

# Font settings
font = cv2.FONT_HERSHEY_SIMPLEX
fontScale = 0.8
thickness = 2

# Output video
out = cv2.VideoWriter('/content/result.mp4', cv2.VideoWriter_fourcc(*'mp4v'), cam_fps, (frame_width, frame_height))

# simmularity threshold
diff_threshold = 0.7

while cap.isOpened():
    success, frame = cap.read()
    if success:

        results = face_detector.track(frame, persist=True, device=device)

        for r in results[0]:
            if 0.6 < r.boxes.conf.item():
                points = r.keypoints.xy.cpu().numpy()[0]
                x_c, y_c, w_b, h_b = r.boxes.xywh.cpu().numpy()[0]
                x_max = int(x_c + (w_b / 2))
                x_min = int(x_c - (w_b / 2))
                y_max = int(y_c + (h_b / 2))
                y_min = int(y_c - (h_b / 2))
                
                cropped_face = frame[y_min:y_max, x_min:x_max]
                cropped_face_embedding = face_crop_embedder(cropped_face)

                simularities = (Data_Base*cropped_face_embedding).sum(dim=1)
                simularities = 1 - simularities
                min_diff_index = torch.argmin(simularities)

                if simularities[min_diff_index].item() < diff_threshold:
                    frame = cv2.putText(frame, f'{img_names_ls[min_diff_index]}',
                                        (int(x_c + w_b/2), int(y_c)), font, fontScale, (0, 0, 200), thickness, cv2.LINE_AA)

                cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)

        out.write(frame)

        # Display the frame (works in Colab)
        # cv2_imshow(frame)

    else:
        break

cap.release()
out.release()
cv2.destroyAllWindows()

files.download('/content/result.mp4')

## **Buffer**

In [14]:
# Open the camera or video file
cap = cv2.VideoCapture("/content/subway.mp4")

# Initialize other variables
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
# cam_fps = cap.get(cv2.CAP_PROP_FPS)
cam_fps = 15

# Output video
out = cv2.VideoWriter('/content/result.mp4', cv2.VideoWriter_fourcc(*'mp4v'), cam_fps, (frame_width, frame_height))

# Font settings
font = cv2.FONT_HERSHEY_SIMPLEX
fontScale = 0.8
thickness = 2

# simmularity threshold
diff_threshold = 0.7
# Define buffer to store last n crops and their identities for each person
buffer_size = 5
# Dictionary to store buffer per person {id: {'embeddings': deque(), 'labels': deque()}}
person_buffers = {}
face_id = None

while cap.isOpened():
    success, frame = cap.read()
    if success:

        results = face_detector.track(frame, persist=True, device=device)

        for r in results[0]:
            if 0.6 < r.boxes.conf.item():
                points = r.keypoints.xy.cpu().numpy()[0]
                x_c, y_c, w_b, h_b = r.boxes.xywh.cpu().numpy()[0]
                x_max = int(x_c + (w_b / 2))
                x_min = int(x_c - (w_b / 2))
                y_max = int(y_c + (h_b / 2))
                y_min = int(y_c - (h_b / 2))
                
                cropped_face = frame[y_min:y_max, x_min:x_max]
                cropped_face_embedding = face_crop_embedder(cropped_face)
                # Get tracking ID (assuming r.track_id exists)
                if r.boxes.id is not None:
                  face_id = r.boxes.id.item()
                  # print("face_id: ", face_id)
                # Initialize buffer for the person if not already in the dictionary
                if face_id not in person_buffers:
                    person_buffers[face_id] = {'labels': deque(maxlen=buffer_size)}

                # Calculate difference with database and determine identity
                simularities = (Data_Base*cropped_face_embedding).sum(dim=1)
                # print("simularities: ", simularities)
                simularities = 1 - simularities
                # print("1 - simularities: ", simularities)
                min_diff_index = torch.argmin(simularities)

                # If confidence of match is high, declare the identity
                if simularities[min_diff_index].item() < diff_threshold:
                    identity = img_names_ls[min_diff_index].split("_")[0]
                    # Store the identity in the buffer
                    person_buffers[face_id]['labels'].append(identity)  # Store the identity in buffer


                # Check if buffer is full (3 consecutive crops)
                if len(person_buffers[face_id]['labels']) == buffer_size:
                    # Perform voting on the labels
                    most_common_identity = Counter(person_buffers[face_id]['labels']).most_common(1)[0][0]
                    # Display the most common identity
                    frame = cv2.putText(frame, f'{most_common_identity}', (int(x_c + w_b / 2), int(y_c)),
                                        font, fontScale, (0, 0, 200), thickness, cv2.LINE_AA)

                # Draw rectangle around the face
                cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 10), 2)
                frame = cv2.putText(frame, f'face_id: {face_id}', (int(1 + x_c + w_b / 2), int(50 + y_c)),
                                        font, 0.6, (212, 122, 66), 1, cv2.LINE_AA)
        out.write(frame)

    else:
        break

cap.release()
out.release()
cv2.destroyAllWindows()



files.download('/content/result.mp4')


0: 384x640 4 faces, 92.1ms
Speed: 4.6ms preprocess, 92.1ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 faces, 83.7ms
Speed: 2.4ms preprocess, 83.7ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 faces, 81.5ms
Speed: 2.4ms preprocess, 81.5ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 faces, 97.5ms
Speed: 2.3ms preprocess, 97.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 faces, 82.8ms
Speed: 2.9ms preprocess, 82.8ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 faces, 89.9ms
Speed: 2.1ms preprocess, 89.9ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 faces, 83.2ms
Speed: 2.6ms preprocess, 83.2ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 faces, 79.3ms
Speed: 4.1ms preprocess, 79.3ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>