<a href="https://colab.research.google.com/github/Besutodesuka/Retriver_for_verification/blob/main/images_retrieval/face_recognition/pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q transformers pillow faiss-cpu torch

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/30.7 MB[0m [31m70.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m107.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m88.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m51.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m37.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import os
import numpy as np
from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel
import faiss

In [None]:
# Initialize the Hugging Face CLIP model and processor.
model_name = "openai/clip-vit-base-patch32"
model = CLIPModel.from_pretrained(model_name)
processor = CLIPProcessor.from_pretrained(model_name)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

In [None]:
def extract_embedding(image_path, model, processor):
    """
    Load an image, process it with the CLIP processor,
    and return the normalized image embedding.
    """
    image = Image.open(image_path).convert("RGB")
    inputs = processor(images=image, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Get image embeddings
    with torch.no_grad():
        outputs = model.get_image_features(**inputs)

    # Convert to numpy and normalize
    embedding = outputs.cpu().numpy()[0]
    norm = np.linalg.norm(embedding)
    if norm != 0:
        embedding = embedding / norm
    return embedding

def build_face_database(image_folder, model, processor):
    """
    Build a database of embeddings from all images in the folder.
    Returns:
        - embeddings: numpy array of shape (N, D)
        - filenames: list of image filenames corresponding to the embeddings.
    """
    embeddings = []
    filenames = []

    for filename in os.listdir(image_folder):
        image_path = os.path.join(image_folder, filename)
        try:
            emb = extract_embedding(image_path, model, processor)
            embeddings.append(emb)
            filenames.append(filename)
            print(f"Processed {filename}")
        except Exception as e:
            print(f"Error processing {filename}: {e}")

    embeddings = np.vstack(embeddings).astype('float32')
    return embeddings, filenames

def create_faiss_index(embeddings):
    """
    Create a FAISS index for normalized embeddings using inner product.
    With normalized vectors, inner product is equivalent to cosine similarity.
    """
    d = embeddings.shape[1]
    index = faiss.IndexFlatIP(d)  # inner product index
    index.add(embeddings)
    return index

def verify_face(query_image_path, index, filenames, model, processor, top_k=3, similarity_threshold=0.6):
    """
    Extract the embedding for the query image, search for the top K nearest neighbors,
    and compute the average similarity score. If the average similarity exceeds the threshold,
    return verification True.
    """
    query_embedding = extract_embedding(query_image_path, model, processor).reshape(1, -1).astype('float32')
    # FAISS expects a batch of query vectors
    similarities, indices = index.search(query_embedding, top_k)

    # similarities are inner product scores (cosine similarity since embeddings are normalized)
    avg_similarity = np.mean(similarities)

    # Print details for inspection.
    print("Top similar images:")
    for i, idx in enumerate(indices[0]):
        print(f"{i+1}. {filenames[idx]} - similarity: {similarities[0][i]:.3f}")

    print(f"Average similarity: {avg_similarity:.3f}")
    verified = avg_similarity >= similarity_threshold
    return verified, avg_similarity

In [None]:
if __name__ == "__main__":
    # Folder containing known face images (each image should have one face ideally)
    known_faces_folder = "known_faces"

    # Build the face database: embeddings and corresponding filenames.
    embeddings, filenames = build_face_database(known_faces_folder, model, processor)

    if embeddings.shape[0] == 0:
        print("No valid embeddings found in the database.")
        exit(1)

    # Create the FAISS index from the embeddings.
    index = create_faiss_index(embeddings)

    # Query image to verify
    query_image_path = "query_face.jpg"

    verified, avg_similarity = verify_face(query_image_path, index, filenames, model, processor,
                                           top_k=3, similarity_threshold=0.6)

    if verified:
        print("Face verified!")
    else:
        print("Face not verified!")