In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
from deepface import DeepFace
from pinecone import Pinecone, ServerlessSpec
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from tqdm import tqdm
from DLAIUtils import Utils


import contextlib
import glob
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import time

In [None]:
# get api key
utils = Utils()
PINECONE_API_KEY = utils.get_pinecone_api_key()

In [None]:
#f - file path
def show_img(f):
  img = plt.imread(f)
  #(4,3) inches
  plt.figure(figsize=(4,3))
  plt.imshow(img)

show_img('family/dad/P06260_face5.jpg')

In [None]:
#Setup Pinecone
MODEL = "Facenet"
INDEX_NAME = utils.create_dlai_index_name('dl-ai')

pinecone = Pinecone(api_key=PINECONE_API_KEY)

In [None]:
#Create Embeddings Using DeepFace
def generate_vectors():
  #Defines the path to the output file where the embeddings will be stored
  VECTOR_FILE = "./vectors.vec"

  #remove the file if it exists (to start fresh), and ignores any FileNotFoundError using contextlib.suppress.
  with contextlib.suppress(FileNotFoundError):
    os.remove(VECTOR_FILE)

  with open(VECTOR_FILE, "w") as f:
    for person in ["mom", "dad", "child"]:
      #all files in each category's directory
      files = glob.glob(f'family/{person}/*')
      for file in tqdm(files):
        try:
          #generates an embedding using DeepFace.represent
          #'enforce_detection=False' allows processing without mandatory face detection (useful in cases where face detection might fail).
          #[0]:Since facial recognition models can detect multiple faces in a single image, this list could potentially contain several embeddings.
          #['embedding'] from the returned dictionary
          embedding = DeepFace.represent(img_path=file, model_name=MODEL, enforce_detection=False)[0]['embedding']
          #Uses os.path.basename(file) to get just the filename without the path.
          f.write(f'{person}:{os.path.basename(file)}:{embedding}\n')
        except (ValueError, UnboundLocalError, AttributeError) as e:
          print(e)

generate_vectors()

In [None]:
#a shell command to display the first 10 lines of the generated file, showing a preview of the embeddings.
!head -10 vectors.vec

In [None]:
#Plot the Data of Images
def gen_tsne_df(person, perplexity):

    vectors =[]
    with open('./vectors.vec', 'r') as f:
      for line in tqdm(f):
        p, orig_img, v = line.split(':')
        if person == p:
            # the embedding vector (v) is evaluated from a string to a list of numbers using eval()
            vectors.append(eval(v))

    #Initializes PCA (Principal Component Analysis) to reduce the dimensionality of the vectors to 8 dimensions.
    #This step is often done before t-SNE to make the computation more efficient and to reduce noise.
    pca = PCA(n_components=8)

    #Initializes the t-SNE algorithm, specifying 2 output dimensions
    tsne = TSNE(2, perplexity=perplexity, random_state = 0, n_iter=1000,
        verbose=0, metric='euclidean', learning_rate=75)
    print(f'transform {len(vectors)} vectors')

    #Transforms the vectors using PCA.
    pca_transform = pca.fit_transform(vectors)

    #Transforms the PCA-reduced vectors into a 2-dimensional space.
    embeddings2d = tsne.fit_transform(pca_transform)

    #Constructs a Pandas DataFrame from the 2D embeddings, with columns x and y representing the two dimensions.
    return pd.DataFrame({'x':embeddings2d[:,0], 'y':embeddings2d[:,1]})

def plot_tsne(perplexity, model):

    #Initializes a Matplotlib subplot with a specified figure size.
    #Sets up the grid and customizes the color and style of the plot's spines (borders).
    (_, ax) = plt.subplots(figsize=(8,5))
    #plt.style.use('seaborn-whitegrid')
    plt.grid(color='#EAEAEB', linewidth=0.5)
    ax.spines['top'].set_color(None)
    ax.spines['right'].set_color(None)
    ax.spines['left'].set_color('#2B2F30')
    ax.spines['bottom'].set_color('#2B2F30')

    #
    colormap = {'dad':'#ee8933', 'child':'#4fad5b', 'mom':'#4c93db'}

    #Plotting Embeddings for Each Person
    for person in colormap:
        embeddingsdf = gen_tsne_df(person, perplexity)
        #Plots these embeddings on the scatter plot using the scatter method.
        ax.scatter(embeddingsdf.x, embeddingsdf.y, alpha=.5,
                   label=person, color=colormap[person])

    #Adding Titles and Legend
    plt.title(f'Scatter plot of faces using {model}', fontsize=16, fontweight='bold', pad=20)
    plt.suptitle(f't-SNE [perplexity={perplexity}]', y=0.92, fontsize=13)
    plt.legend(loc='best', frameon=True)
    plt.show()

In [None]:
plot_tsne(44, 'facenet')

In [None]:
#Store the Embeddings in Pinecone
if INDEX_NAME in [index.name for index in pinecone.list_indexes()]:
  pinecone.delete_index(INDEX_NAME)
pinecone.create_index(name=INDEX_NAME, dimension=128, metric='cosine',
  spec=ServerlessSpec(cloud='aws', region='us-west-2'))

index = pinecone.Index(INDEX_NAME)

def store_vectors():
  with open("vectors.vec", "r") as f:
    for line in tqdm(f):
        person, file, vec = line.split(':')

        #Uses index.upsert to insert or update the vector in the Pinecone index.
        #Each vector is given a unique ID (f'{person}-{file}') and is accompanied by metadata ({"person":person, "file":file}).
        index.upsert([(f'{person}-{file}', eval(vec), {"person":person, "file":file})])

store_vectors()

index.describe_index_stats()

In [None]:
#Calculate the Similarity Scores
#This function can be used to assess the similarity between two sets of facial embeddings, like comparing a parent's facial features with those of a child.
#It's a way to quantitatively measure the facial resemblance between two individuals based on the embeddings generated by a facial recognition model.


#parent: The identifier (key in vec_groups) for the first group of embeddings to be queried.
#child: The identifier for the second group of embeddings, which will be used as a filter in the query.
def test(vec_groups, parent, child):
  #Initializes the Pinecone index for querying.
  index = pinecone.Index(INDEX_NAME)
  #Retrieves the embeddings for the parent.
  parent_vecs = vec_groups[parent]

  #the number of top results to retrieve for each query to the Pinecone index.
  K = 10

  #In this case, SAMPLE_SIZE being 10 means you perform 10 separate queries to the index, each with a different embedding from the parent group.
  #This parameter allows you to average the similarity scores across multiple queries, giving a more holistic measure of similarity between two groups (like "parent" and "child").
  SAMPLE_SIZE = 10
  sum = 0

  #Querying and Accumulating Scores
  for i in tqdm(range(0,SAMPLE_SIZE)):
    query_response = index.query(
      top_k=K,
      vector = parent_vecs[i],
      filter={
        "person": {"$eq": child}
      }
    )
    for row in query_response["matches"]:
      sum  = sum + row["score"]
  print(f'\n\n{parent} AVG: {sum / (SAMPLE_SIZE*K)}')

In [None]:
def compute_scores():
  index = pinecone.Index(INDEX_NAME)
  vec_groups = {"dad":[], "mom":[], "child":[]}
  with open("vectors.vec", "r") as f:
    for line in tqdm(f):
      person, file, vec = line.split(':')
      vec_groups[person].append(eval(vec))
  print(f"DAD {'-' * 20}")
  test(vec_groups, "dad", "child")
  print(f"MOM {'-' * 20}")
  test(vec_groups, "mom", "child")

compute_scores()

In [None]:
#Check the Matching Images
child_base = 'family/child/P06310_face1.jpg'
show_img(child_base)