In [2]:
import shutil
from pathlib import Path

import cv2
import matplotlib.pyplot as plt
import torch

#from facenet_pytorch import MTCNN, InceptionResnetV1
from IPython.display import Video
from PIL import Image
from torch.utils.data import DataLoader
from torchvision import datasets
import deeplake

In [None]:
ds = deeplake.load

In [None]:
if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

print(f"Using {device} device.")

We've download the YouTube video (`https://www.youtube.com/watch?v=73fz_uK-vhs`) to directory `"data"`, so let's start by preparing that.

**Task 4.7.1:** Create a path object for the `"data"` directory. Use the `Path` class from `pathlib`.

In [None]:
data_dir = Path("data")

print(data_dir)

In [None]:
video_name = "lupita_nyongo.mp4"

In [None]:
# The following variables you've already defined will be helpful
print(data_dir)
print(video_name)

In [None]:
input_video = data_dir / video_name

print(input_video)

In [None]:
# Display the video
Video(input_video, width=400)

In [None]:
frames_dir = data_dir / "extracted_frames"

frames_dir.mkdir(exist_ok=True)
print(frames_dir)

In [None]:
video_capture = cv2.VideoCapture(input_video)
frame_rate = round(video_capture.get(cv2.CAP_PROP_FPS))

print(f"Frame rate: {frame_rate}")

In [None]:
interval = 6  # Extract every sixth frame from the video
frame_count = 0

print("Start extracting individual frames...")
while True:
    # read the next frame from the video_capture
    ret, frame = video_capture.read()
    if not ret:
        print("Finished!")
        break  # Break the loop if there are no more frames

    # Save frames at every 'interval' frames
    if frame_count % interval == 0:
        frame_path = frames_dir / f"frame_{frame_count}.jpg"
        cv2.imwrite(frame_path, frame)

    frame_count += 1

video_capture.release()

In [None]:
images_dir = data_dir / "images"
images_dir.mkdir(exist_ok=True)

print(images_dir)

In [None]:
lupita_dir = images_dir / "lupita"
# Now create `lupita` directory
lupita_dir.mkdir(exist_ok=True)

christoph_dir = images_dir / "christoph"
# Now create `christoph` directory
christoph_dir.mkdir(exist_ok=True)

In [None]:
lupita_imgs = [
    "frame_3438.jpg",
    "frame_3486.jpg",
    "frame_3852.jpg",
    "frame_4062.jpg",
    "frame_4914.jpg",
    "frame_4866.jpg",
]

christoph_imgs = [
    "frame_54.jpg",
    "frame_66.jpg",
    "frame_72.jpg",
    "frame_108.jpg",
    "frame_186.jpg",
    "frame_246.jpg",
]

In [None]:
lupita_img_paths = [frames_dir / i for i in lupita_imgs]
christoph_img_paths = [frames_dir / i for i in christoph_imgs]

print("Number of Lupita images:", len(lupita_img_paths))
print("Number of Christoph images:", len(christoph_img_paths))

In [None]:
fig, axs = plt.subplots(1, 6, figsize=(10, 8))
for i, ax in enumerate(axs):
    ax.imshow(Image.open(lupita_img_paths[i]))
    ax.axis("off")

fig, axs = plt.subplots(1, 6, figsize=(10, 8))
for i, ax in enumerate(axs):
    ax.imshow(Image.open(christoph_img_paths[i]))
    ax.axis("off")

In [None]:
# Copy selected images of Lupita over to `lupita` directory
for image_path in lupita_img_paths:
    shutil.copy(image_path, lupita_dir)

# Copy selected images of Christoph over to `christoph` directory
for image_path in christoph_img_paths:
    shutil.copy(image_path, christoph_dir)

print("Number of files in lupita directory:", len(list(lupita_dir.iterdir())))
print("Number of files in christoph directory:", len(list(christoph_dir.iterdir())))

In [None]:
mtcnn = MTCNN(keep_all=True, min_face_size=40)

print(f"MTCNN min face size: {mtcnn.min_face_size}")
print(f"MTCNN keeping all faces: {mtcnn.keep_all}")

In [None]:
sample_image_filename = "frame_4866.jpg"
sample_image_path = frames_dir / sample_image_filename

sample_image = Image.open(sample_image_path)
sample_image

In [None]:
boxes, probs, landmarks = mtcnn.detect(sample_image, landmarks=True)

print("boxes:", boxes)
print("probs:", probs)
print("landmarks:", landmarks)

In [None]:
fig, ax = plt.subplots()
ax.imshow(sample_image)

for box, landmark in zip(boxes, landmarks):
    rect = plt.Rectangle(
        (box[0], box[1]), box[2] - box[0], box[3] - box[1], fill=False, color="blue"
    )
    ax.add_patch(rect)
    for point in landmark:
        ax.plot(point[0], point[1], marker="o", color="red")
plt.axis("off");

In [None]:
resnet = InceptionResnetV1(pretrained="vggface2").eval()

print(f"InceptionResnet weight set: {resnet.pretrained}")

In [None]:
dataset = datasets.ImageFolder(images_dir)

print(dataset)

In [None]:
idx_to_class = {i: c for c, i in dataset.class_to_idx.items()}

print(idx_to_class)

In [None]:
def collate_fn(x):
    return x[0]


loader = DataLoader(dataset, collate_fn=collate_fn)
print(loader.dataset)

In [None]:
# Dictionary that maps name to list of their embeddings
name_to_embeddings = {name: [] for name in idx_to_class.values()}

for img, idx in loader:
    face, prob = mtcnn(img, return_prob=True)
    if face is not None and prob >= 0.90:
        emb = resnet(face[0].unsqueeze(0))
        name_to_embeddings[idx_to_class[idx]].append(emb)

print(name_to_embeddings.keys())
print(type(name_to_embeddings["lupita"]))
print(type(name_to_embeddings["christoph"]))

In [None]:
embeddings_lupita = torch.stack(name_to_embeddings['lupita'])
embeddings_christoph = torch.stack(name_to_embeddings['christoph'])

print(f"Shape of stack of embeddings for Lupita: {embeddings_lupita.shape}")
print(f"Shape of stack of embeddings for Christoph: {embeddings_christoph.shape}")

In [None]:
avg_embedding_lupita = torch.mean(embeddings_lupita, dim=0)
avg_embedding_christoph = torch.mean(embeddings_christoph, dim=0)

print(f"Shape of avg_embedding_lupita: {avg_embedding_lupita.shape}")
print(f"Shape of avg_embedding_christoph: {avg_embedding_christoph.shape}")

In [None]:
test_images = ["frame_2658.jpg", "frame_4614.jpg", "frame_972.jpg", "frame_30.jpg"]

In [None]:
test_paths = [frames_dir / i for i in test_images]

fig, axs = plt.subplots(1, len(test_paths), figsize=(10, 8))
for i, ax in enumerate(axs):
    ax.imshow(Image.open(test_paths[i]))
    ax.axis("off")

In [None]:
from utils import recognize_faces

recognize_faces?

In [None]:
embedding_list = [avg_embedding_lupita, avg_embedding_christoph]
name_list = ["lupita", "christoph"]

embedding_data = list(zip(embedding_list, name_list))

print(embedding_data[0][0].shape, embedding_data[0][1])
print(embedding_data[1][0].shape, embedding_data[1][1])

In [None]:
recognized_faces = []
for test_img_path in test_paths:
    # Call recognize_faces function using test_img_path
    # and append the result to the list `recognized_faces`
    recognized_faces.append(recognize_faces(test_img_path, embedding_data, mtcnn, resnet))