In [1]:
import torch
import os
import shutil
from tqdm import tqdm
from opensearchpy import OpenSearch
from facenet_pytorch import InceptionResnetV1, MTCNN
from PIL import Image, ImageDraw
import uuid
from opensearchpy.helpers import bulk
import matplotlib.pyplot as plt

In [9]:
DATASET_DIR = "/media/faisal/NewVolume/ResilientSage/FaceNet/dataset/images_faces"
FACE_CROP_DIR = "/media/faisal/NewVolume/ResilientSage/faisal/Search/dada/facenet/dataset/cropped_faces"
os.makedirs(FACE_CROP_DIR, exist_ok=True)

In [11]:
# Initialize MTCNN for face detection
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
face_detection_model = MTCNN(keep_all=True, min_face_size=30, device=device)
embedding_model = InceptionResnetV1(pretrained='vggface2').eval().to(device)

In [12]:
# Generate a unique face ID
def generate_face_id(num_char=8):
    return uuid.uuid4().hex[:num_char]

In [15]:
def detect_and_plot_faces(image_path, crop_folder, margin=25):
    try:
        image = Image.open(image_path)
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Detect faces and their bounding boxes
        boxes, probs = face_detection_model.detect(image)

        if boxes is not None:
            # Draw bounding boxes on the image
            draw = ImageDraw.Draw(image)
            for i, (box, prob) in enumerate(zip(boxes, probs)):
                if prob >= 0.98:  # Only process faces with high confidence
                    # Enlarge the bounding box to add a margin around the face
                    x1, y1, x2, y2 = box
                    x1 = max(0, x1 - margin)
                    y1 = max(0, y1 - margin - 10)
                    x2 = min(image.width, x2 + margin)
                    y2 = min(image.height, y2 + margin)

                    # Draw enlarged bounding box
                    draw.rectangle([x1, y1, x2, y2])

                    # Crop face using the enlarged bounding box
                    cropped_face = image.crop((x1, y1, x2, y2))

                    # Save the cropped face with a unique face ID
                    face_id = generate_face_id()
                    cropped_face_path = os.path.join(crop_folder, f"{face_id}.jpg")
                    cropped_face.save(cropped_face_path)

                    # Image ID based on original image name
                    image_id = os.path.splitext(os.path.basename(image_path))[0]

            #         # Plot the cropped face
            #         plt.figure()
            #         plt.imshow(cropped_face)
            #         plt.title(f"Face ID: {face_id}")
            #         plt.show()

            #         # Print bounding box, face ID, and image ID
            #         print(f"Image ID: {image_id}, Face ID: {face_id}, Bounding Box: [{x1}, {y1}, {x2}, {y2}]")

            # # Plot the original image with bounding boxes
            # plt.figure()
            # plt.imshow(image)
            # plt.title(f"Detected Faces in {os.path.basename(image_path)}")
            # plt.show()

        return None
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return None

In [16]:
def process_images_for_copy():
    files = [f for f in os.scandir(DATASET_DIR) if f.is_file()]
    print(f"Found {len(files)} files in the dataset directory")

    for file in tqdm(files, desc="Processing images"):
        image_path = file.path
        face_info = detect_and_plot_faces(image_path, FACE_CROP_DIR)

        if face_info:
            # This is where you could index or store the face_info (image_id, face_id, bounding_box)
            print(face_info)

In [None]:
process_images_for_copy()

In [None]:
# numbner of faces in the dataset
files = [f for f in os.scandir(FACE_CROP_DIR) if f.is_file()]
print(f"Found {len(files)} faces in the dataset directory")

In [None]:
host = [{'host': 'localhost', 'port': 9200}]

def get_client():
    client = OpenSearch(
        hosts=host,
        http_compress=True,
        use_ssl=False,
        verify_certs=False,
        ssl_assert_hostname=False,
        ssl_show_warn=False
    )
    return client

client = get_client()

In [None]:
def create_index(client, index_name):
    response = client.indices.create(
        index=index_name,
        body={
            "settings": {
                "index": {
                    "knn": True,
                    "knn.algo_param.ef_search": 100
                }
            },
            "mappings": {
                "properties": {
                    "my_vector": {
                        "type": "knn_vector",
                        "dimension": 512,
                        "method": {
                            "name": "hnsw",
                            "space_type": "cosinesimil",
                            "engine": "lucene",
                            "parameters": {
                                "ef_construction": 128,
                                "m": 24
                            }
                        }
                    },
                    "face_id": {"type": "keyword"},
                    "image_id": {"type": "keyword"}
                }
            }
        }
    )
    print(f"Index '{index_name}' created successfully.")
    return response


In [3]:
def apply_bulk_indexing(client, actions):
    response = bulk(client, actions)
    return response


In [4]:
def process_faces_and_generate_embeddings(face_crop_dir, index_name):
    files = [f for f in os.scandir(face_crop_dir) if f.is_file()]
    print(f"Found {len(files)} cropped face images in the dataset directory")

    all_actions = []

    for file in tqdm(files, desc="Generating embeddings"):
        image_path = file.path
        try:
            # Open the image
            image = Image.open(image_path)
            if image.mode != "RGB":
                image = image.convert("RGB")

            # Preprocess the image for the embedding model
            face_tensor = torch.tensor(image).permute(2, 0, 1).unsqueeze(0).float().to(device)

            # Generate face embedding
            with torch.no_grad():
                embedding = embedding_model(face_tensor).cpu().tolist()[0]

            # Generate a unique face ID and use the image name as the image ID
            face_id = generate_face_id()
            image_id = os.path.splitext(os.path.basename(image_path))[0]

            # Create index action
            action = {
                "_index": index_name,
                "_id": f"{image_id}",  # Unique ID for the image
                "_source": {
                    "my_vector": embedding,
                    "image_id": image_id,
                    "face_id": face_id
                }
            }
            all_actions.append(action)

        except Exception as e:
            print(f"Error processing {image_path}: {e}")

    return all_actions

In [None]:
def main():
    index_name = "face_embeddings"  # Define index name for OpenSearch
    create_index(client, index_name)

    # Process images and generate embeddings
    actions = process_faces_and_generate_embeddings(FACE_CROP_DIR, index_name)

    # Bulk index the embeddings in OpenSearch
    if actions:
        apply_bulk_indexing(client, actions)
        print(f"Indexed {len(actions)} face embeddings successfully.")

# %%
if __name__ == "__main__":
    main()