# Baseline Evaluation of Retina Face
This file contains code to do inferencing using the RetinaFace model.


In [None]:
from retinaface import RetinaFace
import cv2
import matplotlib.pyplot as plt
import os
import json
import numpy as np
import concurrent.futures

### Experiment on Single Image

In [None]:
img_path = '../ssic_image-corpus/data_phase-3_randomization/randomization_unsorted_imageonly/0004_S35-E4059_00752.png'
img = cv2.imread(img_path)

In [27]:
obj = RetinaFace.detect_faces(img_path)

In [28]:
for key in obj.keys():
    identity = obj[key]
    facialArea = identity['facial_area']
    cv2.rectangle(img, (facialArea[2], facialArea[3]), (facialArea[0], facialArea[1]), (0, 255, 0), 1)

In [None]:
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
print(obj)
print(type(obj))

### Scaled Operation

In [None]:
imageFolder = '../ssic_image-corpus/data_phase-4_consensus/face/image_only'
outputFolder = '../ssic_image-corpus/scripts/baseline_retinaface_consensus/retina_base'

In [None]:
def convert_numpy_to_python(obj):

    if isinstance(obj, np.ndarray):
        # Convert numpy array to list
        return obj.tolist()  
    elif isinstance(obj, (np.integer, np.int32, np.int64)):
        # Convert numpy integer to Python int
        return int(obj)  
    elif isinstance(obj, (np.floating, np.float32, np.float64)):
         # Convert numpy float to Python float
        return float(obj) 
    elif isinstance(obj, dict):
        # Recursively process dict
        return {k: convert_numpy_to_python(v) for k, v in obj.items()}  
    elif isinstance(obj, list):
        return [convert_numpy_to_python(i) for i in obj] 
    else:
        return obj

def detect_faces_in_images(image_dir, output_dir):

    os.makedirs(output_dir, exist_ok=True)
    
    for image_name in os.listdir(image_dir):
        image_path = os.path.join(image_dir, image_name)
        
        if not os.path.isfile(image_path):
            continue
        
        try:
            # Detect faces
            faces = RetinaFace.detect_faces(image_path)
            
            # Convert to JSON-serializable format
            result = convert_numpy_to_python(faces if faces else {"message": "No faces detected"})
        
        except Exception as e:
            result = {"error": str(e)}
        
        # Save to JSON file
        json_filename = f"{os.path.splitext(image_name)[0]}.json"
        json_path = os.path.join(output_dir, json_filename)
        
        with open(json_path, 'w') as f:
            json.dump(result, f, indent=4)
        
        print(f"Saved results for {image_name} to {json_filename}")



In [None]:
# Run the function
detect_faces_in_images(imageFolder, outputFolder)

### Consensus Coding Name Change

In [None]:
import os
import shutil
import concurrent.futures

def copy_file(src_path, dest_path):

    os.makedirs(os.path.dirname(dest_path), exist_ok=True)
    shutil.copy2(src_path, dest_path)  
    print(f"Copied {src_path} to {dest_path}")

def copy_folder_parallel(src_folder, dest_folder, num_workers=8):

    # list all files from source
    file_tasks = []
    for root, dirs, files in os.walk(src_folder):
        for file in files:
            src_path = os.path.join(root, file)
            # Determine the file's relative path to the source folder
            rel_path = os.path.relpath(src_path, src_folder)
            dest_path = os.path.join(dest_folder, rel_path)
            file_tasks.append((src_path, dest_path))
    
    print(f"Found {len(file_tasks)} files to copy.")

    # Use ThreadPoolExecutor to copy files in parallel
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(copy_file, src, dest) for src, dest in file_tasks]
        # Wait for all tasks to complete and catch exceptions if any
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error copying file: {e}")

    print("All files copied.")


In [None]:
src_folder = f'../ssic_image-corpus/data_phase-3_randomization/retinaface_output_toyolo'
dest_folder = f'../ssic_image-corpus/data_phase-4_consensus/face/retinaface_output_toyolo'
copy_folder_parallel(src_folder, dest_folder, num_workers=32)

In [None]:

def rename_file(file_path):

    directory, filename = os.path.split(file_path)
    
    # Split the filename on the first underscore only
    parts = filename.split('_', 1)
    if len(parts) == 2 and parts[0].isdigit() and len(parts[0]) == 4:
        new_filename = parts[1]
        new_path = os.path.join(directory, new_filename)
        try:
            os.rename(file_path, new_path)
            print(f"Renamed: {filename} -> {new_filename}")
        except Exception as e:
            print(f"Error renaming {filename}: {e}")
    else:
        print(f"Skipped (does not match pattern): {filename}")

def parallel_rename_files(folder, num_workers=4):

    # List all files (non-recursive) in the folder
    file_paths = [
        os.path.join(folder, f)
        for f in os.listdir(folder)
        if os.path.isfile(os.path.join(folder, f))
    ]
    
    print(f"Found {len(file_paths)} files to process.")
    
    # Use ThreadPoolExecutor to rename files in parallel
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        executor.map(rename_file, file_paths)
    
    print("Renaming process completed.")



In [None]:

dest_folder = f'../ssic_image-corpus/data_phase-4_consensus/face/retinaface_output_toyolo'
parallel_rename_files(dest_folder, num_workers=32)
