In [6]:
import os
import uuid
from PIL import Image
from PIL.ExifTags import TAGS
import numpy as np
from insightface.app import FaceAnalysis
import clip
import torch
from geopy.geocoders import Nominatim
from rocksdict import Rdict
import faiss
import json
from sklearn.cluster import DBSCAN
from pillow_heif import register_heif_opener


In [None]:
# CLOSE DATABASES IF NOT DONT ALREADY IN BELOW CELLS (WHILE RUNNING IN JUPYTER ONLY, TESTING)
image_db.close()
face_db.close()

In [5]:
# RESET DATABASES TO BLANK (FOR TESTING ONLY)
import glob
for file in glob.glob("new_db/*.faiss"):
    os.remove(file)
for file in glob.glob("new_db/*.json"):
    os.remove(file)
import shutil
db_path = "new_db/image_db"
if os.path.exists(db_path):
    shutil.rmtree(db_path)
db_path = "new_db/face_db"
if os.path.exists(db_path):
    shutil.rmtree(db_path)


In [None]:

# SET DEVICE INFO
device = "cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu")

# CLIP MODEL FOR SEMANTIC EMBEDDINGS
clip_model, preprocess = clip.load("ViT-B/32", device=device)

# INSIGHTFACE MODEL (RetinaNet + MobileFaceNet)
face_app = FaceAnalysis(name="buffalo_l", detectors=['SCRFD'])
face_app.prepare(ctx_id=0, det_size=(640, 640))

# Enable HEIC support
register_heif_opener()

# Nominatim for Geolocation Address generation
geolocator = Nominatim(user_agent="image_search_app")

# Elasticsearch client (NOT WORKING?)
# es = Elasticsearch(["http://localhost:9200"])
# es = Elasticsearch("http://localhost:9200", node_class=RequestsHttpNode)
# es = Elasticsearch("http://localhost:9200", request_timeout=30, max_retries=10, retry_on_timeout=True)

# FAISS setup for storing/searching embeddings
clip_dim = 512
face_dim = 512
clip_index = faiss.IndexFlatL2(clip_dim)
face_index = faiss.IndexFlatL2(face_dim)  # For image-specific face embeddings
face_db_index = faiss.IndexFlatL2(face_dim)  # Face database for clustering

# FACE AND METADATA STORAGE WITH RocksDict (wrapper around RocksDB)
# face_db = {}  # {face_id: {"embedding": np.array, "name": str or None, "count": int}}
# image_db.close()
# face_db.close()
try:
    image_db = Rdict("new_db/image_db")  # Stores image metadata
    face_db = Rdict("new_db/face_db")  # Stores face data
except Exception as e:
    print("Looks like lock is not available, closing existing db and creating new.")
    image_db.close()
    face_db.close()
    image_db = Rdict("new_db/image_db")  # Stores image metadata
    face_db = Rdict("new_db/face_db")  # Stores face data

# Directory containing images
image_dir = "ImageSamples"



Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /Users/adityapatil/.insightface/models/buffalo_l/1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /Users/adityapatil/.insightface/models/buffalo_l/2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /Users/adityapatil/.insightface/models/buffalo_l/det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /Users/adityapatil/.insightface/models/buffalo_l/genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /Users/adityapatil/.insightface/models/buffalo_l/w600k_r50.onnx recognition ['None

In [5]:
def extract_metadata(image_path):
    """Extract date and geolocation from EXIF using Pillow."""
    img = Image.open(image_path)
    exif_data = img._getexif()
    metadata = {}
    
    if exif_data:
        for tag_id, value in exif_data.items():
            tag = TAGS.get(tag_id, tag_id)
            if tag == "DateTimeOriginal":
                metadata["date"] = value
            elif tag == "GPSInfo":
                gps_data = {}
                for t in value:
                    sub_tag = TAGS.get(t, t)
                    gps_data[sub_tag] = value[t]
                lat = gps_data.get("GPSLatitude")
                lon = gps_data.get("GPSLongitude")
                if lat and lon:
                    lat = float(lat[0]) + float(lat[1])/60 + float(lat[2])/3600
                    lon = float(lon[0]) + float(lon[1])/60 + float(lon[2])/3600
                    if gps_data.get("GPSLatitudeRef") == "S":
                        lat = -lat
                    if gps_data.get("GPSLongitudeRef") == "W":
                        lon = -lon
                    metadata["location"] = (lat, lon)
    
    return metadata

def get_location_description(lat, lon):
    """Convert lat/lon to human-readable location using Nominatim."""
    try:
        location = geolocator.reverse((lat, lon), language="en")
        return location.address if location else "Unknown"
    except:
        return "Unknown"

def update_face_db(face_embedding, quality_score):
    """Update face database with robust embeddings."""
    # Search for nearest face in face_db_index
    distances, indices = face_db_index.search(np.array([face_embedding]), k=1)
    threshold = 0.6  # Cosine similarity threshold (tune as needed)
    if face_db_index.ntotal == 0 or distances[0][0] > threshold:  # New face
        face_id = str(uuid.uuid4())
        face_db[face_id] = {"embedding": face_embedding, "name": None, "count": 1}
        face_db_index.add(np.array([face_embedding]))
    else:  # Existing face
        face_id = list(face_db.keys())[indices[0][0]]
        current_embedding = face_db[face_id]["embedding"]
        face_db[face_id]["count"] += 1
        # Update embedding if quality is high (e.g., clear image, no occlusion)
        if quality_score > 0.9:  # Assuming quality_score from InsightFace (tune threshold)
            face_db[face_id]["embedding"] = (current_embedding + face_embedding) / 2  # Average for robustness
            face_db_index.reconstruct(indices[0][0])[:] = face_db[face_id]["embedding"]

    return face_id

def process_image(image_path):
    """Process a single image: extract metadata, embeddings, and index."""
    image_id = str(uuid.uuid4())
    metadata = extract_metadata(image_path)
    date = metadata.get("date", "Unknown")
    loc_coords = metadata.get("location")
    location_desc = get_location_description(*loc_coords) if loc_coords else "Unknown"
    
    img = Image.open(image_path).convert("RGB")
    img_preprocessed = preprocess(img).unsqueeze(0).to(device)
    with torch.no_grad():
        clip_embedding = clip_model.encode_image(img_preprocessed).cpu().numpy().flatten()
    
    # Detect and embed faces
    face_data = []
    img_np = np.array(img)
    faces = face_app.get(img_np)
    for face in faces:
        face_embedding = face.embedding
        quality_score = face.det_score  # Detection confidence as proxy for quality
        face_id = update_face_db(face_embedding, quality_score)
        face_data.append({"face_id": face_id})
    
    # Store in Elasticsearch
    doc = {
        "image_id": image_id,
        "image_path": image_path,
        "date": date,
        "location": location_desc,
        "tags": [],
        "faces": [{"face_id": f["face_id"]} for f in face_data]
    }
    # es.index(index="images", id=image_id, body=doc)
    image_db[image_id.encode()] = json.dumps(doc).encode()
    # Store CLIP embedding in FAISS
    clip_index.add(np.array([clip_embedding]))
    
    # Save mapping
    with open("new_db/index_mapping.json", "a") as f:
        f.write(json.dumps({"image_id": image_id, "clip_idx": clip_index.ntotal - 1}) + "\n")
    
    return image_id


In [None]:
# Process all images
for filename in os.listdir(image_dir)[:5]:
    if filename.lower().endswith((".jpg", ".jpeg", ".png", ".heic")):
        image_path = os.path.join(image_dir, filename)
        print(f"Processing {image_path}")
        process_image(image_path)


# Save indices and face database
faiss.write_index(clip_index, "new_db/clip_index.faiss")
faiss.write_index(face_db_index, "new_db/face_db_index.faiss")

import json

def handle_circular_references(obj):
    if isinstance(obj, np.ndarray):
        return obj.tolist()  # Convert NumPy arrays to lists
    if isinstance(obj, dict): 
        return {k: handle_circular_references(v) for k, v in obj.items()}  # Recursively fix dicts
    if isinstance(obj, list):
        return [handle_circular_references(v) for v in obj]  # Recursively fix lists
    if hasattr(obj, '__dict__'):  # Convert objects to dicts safely
        return {k: handle_circular_references(v) for k, v in obj.__dict__.items()}
    return str(obj)  # Convert problematic objects to strings

# Now dump JSON safely
with open("new_db/face_db.json", "w") as f:
    json.dump(face_db, f, default=handle_circular_references, indent=4)

# Close RocksDB databases after operations
image_db.close()
face_db.close()

Processing /Users/adityapatil/Glimpse/ImageSamples/UQVJ8690.JPG


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4


Processing /Users/adityapatil/Glimpse/ImageSamples/IMG_6101.JPG
Processing /Users/adityapatil/Glimpse/ImageSamples/KXMZ7727.JPG
Processing /Users/adityapatil/Glimpse/ImageSamples/IMG_8065.JPG


In [98]:
import json
import rocksdict
from PIL import Image
import io
import matplotlib.pyplot as plt
from IPython.display import display

# Open RocksDB databases
image_db = rocksdict.Rdict("new_db/image_db")  # Stores image metadata
face_db = rocksdict.Rdict("new_db/face_db")  # Stores face data

def show_top_unnamed_faces():
    """Show top 10 unnamed faces by frequency and display their images."""
    unnamed_faces = {}

    for fid, data in face_db.items():
        # Ensure data is a dict
        if isinstance(data, bytes):  
            data = json.loads(data.decode())  # Decode bytes to dict
        
        if data.get("name") is None:
            unnamed_faces[fid] = data
    # unnamed_faces = {fid: json.loads(data) for fid, data in face_db.items() if json.loads(data)["name"] is None}
    top_faces = sorted(unnamed_faces.items(), key=lambda x: x[1]["count"], reverse=True)[:10]
    
    for face_id, data in top_faces:
        # Search for an image containing this face ID
        for key, img_data in image_db.items():
            img_doc = json.loads(img_data)
            faces = img_doc.get("faces", [])
            if any(f["face_id"] == face_id for f in faces):  
                image_path = img_doc.get("image_path")
                if not image_path:
                    print(f"Face ID: {face_id}, Count: {data['count']}, No image path found!")
                    continue
                
                try:
                    img = Image.open(image_path).convert("RGB")  # Open image from stored path
                    
                    # If bounding box data exists, crop face
                    face_bbox = next((f["bbox"] for f in faces if f["face_id"] == face_id), None)
                    if face_bbox:
                        x, y, w, h = face_bbox
                        face_img = img.crop((x, y, x + w, y + h))
                    else:
                        face_img = img  # Show full image if no bounding box
                    
                    # Display face inline
                    plt.figure(figsize=(2, 2))
                    plt.imshow(face_img)
                    plt.axis("off")
                    plt.show()

                    print(f"Face ID: {face_id}, Count: {data['count']}, Image: {image_path}")
                except Exception as e:
                    print(f"Error loading image {image_path}: {e}")
                
                break  # Stop after finding the first relevant image

def rename_faces(face_id_name_pairs):
    """Rename specified face IDs."""
    for face_id, name in face_id_name_pairs.items():
        face_id_encoded = face_id.encode()
        
        if face_id_encoded in face_db:
            face_data = face_db[face_id_encoded]
            
            if isinstance(face_data, bytes):  
                face_data = json.loads(face_data.decode())  # Decode bytes to dict
            
            face_data["name"] = name
            face_db[face_id_encoded] = json.dumps(face_data).encode()  # Store back in RocksDB
            print(f"Renamed {face_id} to {name}")

# Close RocksDB databases after operations
image_db.close()
face_db.close()

In [99]:
# Example usage
# Open RocksDB databases
image_db = rocksdict.Rdict("new_db/image_db")  # Stores image metadata
face_db = rocksdict.Rdict("new_db/face_db")  # Stores face data
show_top_unnamed_faces()
image_db.close()
face_db.close()

Error loading image /Users/adityapatil/Glimpse/ImageSamples/IMG_8065.JPG: 'bbox'
Face ID: 082686fe-4436-4c81-a430-c156905a5018, Count: 1, No image path found!
Face ID: 166bb535-1ff6-44c8-8d61-1a0dc07a0d7e, Count: 1, No image path found!
Face ID: 3097c8de-457b-48af-9f73-809a0ee96428, Count: 1, No image path found!
Error loading image /Users/adityapatil/Glimpse/ImageSamples/KXMZ7727.JPG: 'bbox'
Face ID: 3a1408e4-ee14-4c0f-8f9e-5bc6b78a1535, Count: 1, No image path found!
Error loading image /Users/adityapatil/Glimpse/ImageSamples/IMG_6101.JPG: 'bbox'
Face ID: 50bdc786-996a-4eb6-8558-4bd43a963756, Count: 1, No image path found!
Error loading image /Users/adityapatil/Glimpse/ImageSamples/KXMZ7727.JPG: 'bbox'
Face ID: 606f776b-f4b6-4559-8753-1ba9b1c78224, Count: 1, No image path found!


In [None]:

rename_faces({"face_id_1": "Emma", "face_id_2": "John"})  # Replace with actual IDs