In [None]:
# Package imports
import warnings
warnings.filterwarnings('ignore')

from deepface import DeepFace
from pinecone import Pinecone, ServerlessSpec
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from tqdm import tqdm
from DLAIUtils import Utils


import contextlib
import glob
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import time

utils = Utils()
PINECONE_API_KEY = utils.get_pinecone_api_key()

In [None]:
# Load the Dataset
#!wget -q --show-progress -O family_photos.zip "https://www.dropbox.com/scl/fi/yg0f2ynbzzd2q4nsweti5/family_photos.zip?rlkey=00oeuiii3jgapz2b1bfj0vzys&dl=0"
#!unzip -q family_photos.zip

def show_img(f):
    img = plt.imread(f)
    plt.figure(figsize=(4,3))
    plt.imshow(img)

show_img('family/dad/P06260_face5.jpg')
show_img('family/mom/P04407_face2.jpg')
show_img('family/child/P04414_face1.jpg')

In [None]:
# Setuo Pinceone
MODEL = "Facenet"
INDEX_NAME = utils.create_dlai_index_name('dl-ai')

pinecone = Pinecone(api_key=PINECONE_API_KEY)

In [None]:
# Create Embeddings Using DeepFace
def generate_vectors():
    VECTOR_FILE = "./vectors.vec"
    with contextlib.suppress(FileNotFoundError):
        os.remove(VECTOR_FILE)
    with open(VECTOR_FILE, "w") as f:
        for person in ["mom", "dad", "child"]:
            files = glob.glob(f'family/{person}/*')
            for file in tqdm(files):
                try:
                    embedding = DeepFace.represent(img_path=file, model_name=MODEL, enforce_detection=False)[0]['embedding']
                    f.write(f'{person}:{os.papth.basename(file)}:{embedding}\n')
                except (ValueError, UnboundLocalError, AttributeError) as e:
                    print(e)
generate_vectors()
!head -10 vectors.vec

In [None]:
# Plot the Data of Images
def gen_tsne_df(person, perplexity):
    vectors = []
    with open('./vectors.vec', 'r') as f:
        for line in tqdm(f):
            p, orig_img, v = line.split(':')
            if person == p:
                vectors.append(eval(v))
    pca = PCA(n_components=8)
    tsne = TSNE(2, perplexity=perplexity, random_state=0, n_iter=1000, verbose=0, metric='euclidean', learning_rate=75)
    print(f"transform {len(vectors)} vectors")
    pca_transform = pca.fit_transform(vectors)
    embeddings2d = tsne.fit_transform(pca_transform)
    return pd.DataFrame({'x':embeddings2d[:,0], 'y':embeddings2d[:,1]})

def plot_tsne(perplexity, model):
    (_, ax) = plt.subplots(1, 3, figsize=(8, 5))
    plt.grid(color='#EAEAEB', linewidth=0.5)
    ax.spines['top'].set_color(None)
    ax.spines['right'].set_color(None)
    ax.spines['left'].set_color('#2B2F30')
    ax.spines['bottom'].set_color('#2B2F30')
    colormap = {'dad':'#ee8933', 'child':'#4fad5b', 'mom':'#4c93db'}

    for person in colormap:
        embeddingsdf = gen_tsne_df(person, perplexity)
        ax.scatter(embeddingsdf.x, embeddingsdf.y, alpha=.5, 
                   label=person, color=colormap[person])
    plt.title(f'Scatter plot of faces using {model}', fontsize=16, fontweight='bold', pad=20)
    plt.suptitle(f't-SNE [perplexity={perplexity}]', y=0.92, fontsize=13)
    plt.legend(loc='best', frameon=True)
    plt.show()

plot_tsne(44, 'facenet')
    

In [None]:
# Store the Embeddings in Pinecone
if INDEX_NAME in [index.name for index in pinecone.list_indexes()]:
  pinecone.delete_index(INDEX_NAME)
pinecone.create_index(name=INDEX_NAME, dimension=128, metric='cosine',
  spec=ServerlessSpec(cloud='aws', region='us-west-2'))

index = pinecone.Index(INDEX_NAME)

def store_vectors():
  with open("vectors.vec", "r") as f:
    for line in tqdm(f):
        person, file, vec = line.split(':')
        index.upsert([(f'{person}-{file}', eval(vec), {"person":person, "file":file})])
store_vectors()

index.describe_index_stats()

In [None]:
# Calculate the Similarity Scores
def test(vec_groups, parent, child):
    index = pinecone.Index(INDEX_NAME)
    parent_vecs = vec_groups[parent]
    K = 10
    SAMPLE_SIZE = 10
    sum = 0
    for i in tqdm(range(0, SAMPLE_SIZE)):
        query_response = index.query(
            top_k=K,
            vector = parent_vecs[i],
            filter={
                "person": {"$eq": child}
            }
        )
        for row in query_response:
            sum += row['score']
    print(f"\n\n{parent} AVG: {sum/(SAMPLE_SIZE*K)}")

def compute_scores():
    index = pinecone.Index(INDEX_NAME)
    vec_groups = {"dad":[], "mom":[], "child":[]}
    with open("vectors.vec", "r") as f:
        for line in f:
            person, file, vec = line.split(':')
            vec_groups[person].append(eval(vec))
    print(f"DAD {'-'*20}")
    test(vec_groups, "dad", "child")
    print(f"MOM {'-'*20}")
    test(vec_groups, "mom", "child")

compute_scores()

In [None]:
# Check the Mathching Images
child_base = 'family/child/P06310_face1.jpg'
show_img(child_base)

# Now find the closest image of Dad
embedding = DeepFace.represent(child_base, model_name=MODEL, enforce_detection=False)[0]['embedding']
print(embedding)

query_response = index.query(
    top_k=1,
    vector=embedding,
    filter={
        "person": {"$eq": "dad"}
    },
    include_metadata=True
)

print(query_response)
photo = query_response['matches'][0]['metadata']['file']
show_img(f'family/dad/{photo}')