In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install -U insightface onnxruntime

Collecting insightface
  Downloading insightface-0.7.3.tar.gz (439 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.5/439.5 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting onnxruntime
  Downloading onnxruntime-1.22.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting onnx (from insightface)
  Downloading onnx-1.18.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting coloredlogs (from onnxruntime)
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting humanfriendly>=9.1 (from coloredlogs->onnxruntime)
  Downloading humanfriendly-10.0-py2.py3-none-any.whl.metadata (9.2 kB)
Downloading onnxruntime-1.22.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (16.4 MB)
[2K   [90

In [None]:
# Change directory to the GFPGAN repository in your drive
gfpgan_path = '/content/drive/MyDrive/model'
%cd "{gfpgan_path}"



# Install required packages
!pip install -r requirements.txt

/content/drive/MyDrive/model
Collecting basicsr>=1.4.2 (from -r requirements.txt (line 1))
  Downloading basicsr-1.4.2.tar.gz (172 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.5/172.5 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting facexlib>=0.2.5 (from -r requirements.txt (line 2))
  Downloading facexlib-0.3.0-py3-none-any.whl.metadata (4.6 kB)
Collecting lmdb (from -r requirements.txt (line 3))
  Downloading lmdb-1.6.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.1 kB)
Collecting tb-nightly (from -r requirements.txt (line 8))
  Downloading tb_nightly-2.20.0a20250526-py3-none-any.whl.metadata (1.9 kB)
Collecting yapf (from -r requirements.txt (line 12))
  Downloading yapf-0.43.0-py3-none-any.whl.metadata (46 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.8/46.8 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting realesrgan (from -r 

In [None]:
# Fix the import issue for basicsr
!sed -i "s/from torchvision.transforms.functional_tensor import rgb_to_grayscale/from torchvision.transforms.functional import rgb_to_grayscale/" /usr/local/lib/python3.11/dist-packages/basicsr/data/degradations.py


Organizer Side

In [None]:
import os
import glob
import cv2
import re
import pickle
import numpy as np
from scipy.spatial.distance import cosine
import subprocess
from insightface.app import FaceAnalysis
from insightface.model_zoo import model_zoo
from insightface.model_zoo.arcface_onnx import ArcFaceONNX

# Setup InsightFace once
face_analyzer = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
face_analyzer.prepare(ctx_id=0)


def run_gfpgan(input_path, output_folder, gfpgan_version, upscale_factor):
    if os.path.exists(output_folder):
        subprocess.run(f"rm -rf {output_folder}", shell=True)
    os.makedirs(output_folder, exist_ok=True)
    cmd = f"python inference_gfpgan.py -i {input_path} -o {output_folder} -v {gfpgan_version} -s {upscale_factor} --bg_upsampler realesrgan"
    print(f"Running GFPGAN on {input_path} ...")
    subprocess.run(cmd, shell=True, check=True)

def extract_and_process_faces(image_path, temp_dir, resolution_threshold, gfpgan_version, upscale_factor):
    processed_faces = []
    img = cv2.imread(image_path)
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    faces = face_analyzer.get(img_rgb)

    for i, face in enumerate(faces):

      bbox = face.bbox.astype(int)
      x1, y1, x2, y2 = bbox
      face_crop = img_rgb[y1:y2, x1:x2]
      if face_crop is None or face_crop.size == 0:
            print(f"Warning: aligned face is empty for face {i} in {image_path}")
            continue
      h, w = face_crop.shape[:2]

      if h < resolution_threshold or w < resolution_threshold:
          base_name = os.path.splitext(os.path.basename(image_path))[0]
          face_filename = f"{base_name}_face_{i}.png"
          temp_face_path = os.path.join(temp_dir, face_filename)
          cv2.imwrite(temp_face_path, cv2.cvtColor(face_crop, cv2.COLOR_RGB2BGR))
          gfpgan_out = os.path.join(temp_dir, f"gfpgan_{base_name}_face_{i}")
          run_gfpgan(temp_face_path, gfpgan_out, gfpgan_version, upscale_factor)

          restored_path = os.path.join(gfpgan_out, "restored_faces", f"{base_name}_face_{i}_00.png")
          if os.path.exists(restored_path):
              restored_face = cv2.imread(restored_path)
              restored_face = cv2.cvtColor(restored_face, cv2.COLOR_BGR2RGB)
              processed_faces.append(restored_face)
          else:
              processed_faces.append(face_crop)
      else:
            processed_faces.append(face_crop)
    return processed_faces
def add_padding(img, pad_ratio=0.5):
    h, w = img.shape[:2]
    pad_h = int(h * pad_ratio)
    pad_w = int(w * pad_ratio)
    padded = cv2.copyMakeBorder(img, pad_h, pad_h, pad_w, pad_w, cv2.BORDER_CONSTANT, value=[0, 0, 0])
    return padded

def get_embedding(face_img):
    face_img_padded = add_padding(face_img)
    faces = face_analyzer.get(face_img_padded)
    if len(faces) == 0:
        raise ValueError("No face detected in the processed face image.")

    emb = faces[0].embedding
    emb = emb / np.linalg.norm(emb)
    return emb

def process_event_images(event_id,uploaded_files,base_data_dir='../data',
                         gfpgan_version='1.3', upscale_factor=2, resolution_threshold=128):
    """
    Processes uploaded images for a specific event.

    Parameters:
      event_id (str): Unique identifier for the event.
      uploaded_files (dict): Dictionary of {filename: file_content}
                             (or a list of file paths if already stored).
      base_data_dir (str): Base directory for event data.
      gfpgan_version (str): Version parameter for GFPGAN.
      upscale_factor (int): Upscale factor for GFPGAN.
      resolution_threshold (int): Threshold resolution below which GFPGAN is applied.

    This function:
      - Creates event-specific directories.
      - Moves the uploaded images to the event's group photos folder.
      - Processes each image: detects faces, applies GFPGAN if needed, and computes embeddings.
      - Stores the embeddings in an event-specific pickle file.
    """




    event_folder = os.path.join(base_data_dir, f"event_{event_id}")
    group_photos_folder = os.path.join(event_folder, "group_photos")

    temp_dir = os.path.join(event_folder, "temp_faces")
    os.makedirs(group_photos_folder, exist_ok=True)
    os.makedirs(temp_dir, exist_ok=True)

    embeddings_file = os.path.join(event_folder, f"face_embeddings_{event_id}.pkl")

    if os.path.exists(embeddings_file):
        with open(embeddings_file, 'rb') as f:
            stored_embeddings = pickle.load(f)
    else:
        stored_embeddings = []
    image_paths = glob.glob(os.path.join(group_photos_folder, '*.[jpJP][pnPN]*[gG]'))

    for filename, file_content in uploaded_files.items():
      # Save the uploaded file into the group_photos_folder.
      dst_path = os.path.join(group_photos_folder, filename)

      with open(dst_path, 'wb') as f:
          f.write(file_content)
      print(f"\nProcessing organizer image: {dst_path}")
      face_images = extract_and_process_faces(dst_path, temp_dir, resolution_threshold,
                                              gfpgan_version, upscale_factor)

      for idx, face_img in enumerate(face_images):
          try:
              embedding = get_embedding(face_img)
              stored_embeddings.append({
                  "embedding": embedding,
                  "original_image": dst_path
              })
          except Exception as e:
              print(f"Error computing embedding for face {idx} in {dst_path}: {e}")

    with open(embeddings_file, 'wb') as f:
        pickle.dump(stored_embeddings, f)

    subprocess.run(f"rm -rf {temp_dir}", shell=True)

    print("Organizer processing complete. All embeddings are stored.")
    return embeddings_file

# Example usage:
# Upload files using Google Colab's file uploader for testing.
from google.colab import files
uploaded_files = files.upload()

# Specify an event ID
event_id = "event123"

# Call the process_event_images function with the uploaded files.

embeddings_file = process_event_images(event_id,uploaded_files)

Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/w600k_r50.onnx recognition ['None', 3, 112, 112] 127.5 127.5
set det-size: (640, 640)


Saving sample_2_p2.jpeg to sample_2_p2 (7).jpeg

Processing organizer image: ../Project/event_buffalo_l/group_photos/sample_2_p2.jpeg
Running GFPGAN on ../Project/event_buffalo_l/temp_faces/sample_2_p2_face_2.png ...
Running GFPGAN on ../Project/event_buffalo_l/temp_faces/sample_2_p2_face_3.png ...
Running GFPGAN on ../Project/event_buffalo_l/temp_faces/sample_2_p2_face_4.png ...
Organizer processing complete. All embeddings are stored.


 Participant Side

In [None]:
def load_stored_embeddings(embeddings_file):
    if os.path.exists(embeddings_file):
        with open(embeddings_file, 'rb') as f:
            return pickle.load(f)
    else:
        raise Exception(f"No stored embeddings found at {embeddings_file}!")

def extract_faces_for_participant(image_path):
    """
    Detects and extracts faces using RetinaFace.
    Returns a list of cropped face arrays.
    """
    img = cv2.imread(image_path)
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    faces = face_analyzer.get(img_rgb)
    return [img_rgb[int(f.bbox[1]):int(f.bbox[3]), int(f.bbox[0]):int(f.bbox[2])] for f in faces]

def match_participant_event(event_id, participant_image_path, base_data_dir='../data', matching_threshold=0.4):
    """
    Processes the participant image for the given event.

    Parameters:
      event_id (str): Unique identifier for the event.
      participant_image_path (str): Path to the participant's image.
      base_data_dir (str): Base directory where event folders are stored.
      matching_threshold (float): Threshold for cosine distance matching.

    Returns:
      A list of matched organizer image paths.

    The function:
      - Builds the event-specific folder and embeddings file name.
      - Loads stored embeddings for the event.
      - Extracts faces from the participant image (ensuring exactly one face).
      - Computes the participant embedding and normalizes it.
      - Compares it against each stored embedding using cosine distance.
    """

    event_folder = os.path.join(base_data_dir, f"event_{event_id}")
    embeddings_file = os.path.join(event_folder, f"face_embeddings_{event_id}.pkl")
    stored_embeddings = load_stored_embeddings(embeddings_file)

    face_arrays = extract_faces_for_participant(participant_image_path)

    # Ensure exactly one face is detected.

    if len(face_arrays) != 1:
        raise Exception(f"Participant image must contain exactly one face. Detected {len(face_arrays)} faces.")

    participant_embedding = get_embedding(face_arrays[0])


    # Compare the participant's embedding with each stored embedding using cosine distance.
    matched_images = set()
    for entry in stored_embeddings:
        distance = cosine(participant_embedding, entry["embedding"])
        if round(distance, 1) <= matching_threshold:
            matched_images.add(entry["original_image"])

    return list(matched_images)

# Example usage
if __name__ == "__main__":
    event_id = "event123"
    participant_image_path = "/content/drive/MyDrive/ProjData/refdata/p17.JPG"
    try:
        matches = match_participant_event(event_id, participant_image_path)
        if matches:
            print(f"Matches found:")
            for m in matches:
                print("  -", m)
        else:
            print("No matches found.")
    except Exception as e:
        print("Error:", e)


Matches found:
  - ../Project/event_buffalo_l/group_photos/sample_2_p2.jpeg
