In [None]:
import sys
import requests
import numpy as np
import cv2
import uuid
import time
import os
import urllib
import shutil
# https://github.com/JorgePoblete/DuckDuckGoImages
# !{sys.executable} -m pip install DuckDuckGoImages insightface
import DuckDuckGoImages as ddg
from insightface.app import FaceAnalysis
from sklearn.cluster import DBSCAN
from collections import Counter
from pathlib import Path

In [None]:
MAX_PROCESSED_FACES_IN_IMAGE = 5
MAX_WEB_SEARCH_URLS = 50
MIN_SELECTED_GALLERY_FACES = 10
gallery_names_filepath = '/tmp/personaggi.txt'
output_root_folder_path = "/tmp/gallery"

# --- InsightFace ---
allowed_modules = ["detection", "recognition"]
model_pack_name = "buffalo_l"
ctx_id = 1
app = FaceAnalysis(name=model_pack_name, allowed_modules=allowed_modules)
app.prepare(ctx_id=ctx_id)

In [None]:
""" Runs a DuckDuckGo image search.
    Based on https://github.com/JorgePoblete/DuckDuckGoImages
    Alternative DuckDuckGo search Python APIs:
    - DuckDuckGo Image Search API (https://github.com/joeyism/duckduckgo-images-api)
    - Duckduckgo_search (https://github.com/deedy5/duckduckgo_search)
    
    Parameters
    ----------
    query : The query string
    max_urls : The maximum number of image URLs to return
    
        
    Returns
    -------
    image : The list of URLs for images that match the query string
    """
def get_urls(query, max_urls=None):
    urls = []
    urls = ddg.get_image_urls(query, license='ALL')
    if max_urls is not None and len(urls) > max_urls:
        urls = urls[:max_urls]
    return urls

In [None]:
""" Downloads one image from an URL
    
    Parameters
    ----------
    url : The input image URL
        
    Returns
    -------
    image : The CV2 image
    """
def url_to_image(url):
    image = None
    try:
        r = requests.get(url, stream=True, timeout=60)
        if r.status_code == 200:
            data = r.content
            image = np.asarray(bytearray(data), dtype="uint8")
            image = cv2.imdecode(image, cv2.IMREAD_COLOR)
        return image
    except:
        return None

In [None]:
""" Runs InsightFace to detect faces and extract descriptors (i.e., embeddings) from a list of image URLs
    
    Parameters
    ----------
    image_urls : The list of input image URLs
    max_detected_faces: Maximum number of faces to consider when detected in one image
    folder: The path where the downloaded images are saved.If None, images are not saved
        
    Returns
    -------
    faces_in_images : The list of faces and descriptors found
    """
def get_faces(image_urls, max_detected_faces=None, folder=None):
    faces_in_images = []
    for image_url in image_urls:
        print("Processing %s ..." % image_url)
        img = url_to_image(image_url)
        if img is not None:
            filename = "%s.jpg" % uuid.uuid4().hex
            if folder is not None:
                cv2.imwrite(os.path.join(folder, "tmp", filename), img)
            detected_faces =  app.get(img)
#             print("Found %d face(s)" % len(detected_faces))
            if max_detected_faces is not None and len(detected_faces) > max_detected_faces:
                detected_faces = detected_faces[:max_detected_faces]
            for face in detected_faces:
                faces_in_images.append({
#                     "img": img,
                    "filename": filename,
                    "uri": image_url,
                    "bbox": face.bbox.tolist(),
                    "embedding": face.embedding.tolist()
                })       
    return faces_in_images

In [None]:
""" Runs DBSCAN to find the groups of faces
    
    Parameters
    ----------
    embeddings : List of face embeddings
    metric : DBSCAN metric param
    eps : DBSCAN eps param
    min_samples : DBSCAN min_sample param
        
    Returns
    -------
    labels : The list of cluster labels
    """
def get_clusters(embeddings, metric= 'cosine', eps=0.6, min_samples=2):
    clt = DBSCAN(eps=eps, min_samples=min_samples, metric=metric, n_jobs=-1)
    if len(embeddings) == 0:
        return []
    labels = clt.fit_predict(embeddings)
    return labels

In [None]:
def get_top_cluster(labels, min_count=2):
    """ Gets the most numerous cluster that is not noise
    
    Parameters
    ----------
    labels : List of cluster labels
    min_count : Minimum number of cluster's members
    
    Returns
    -------
    top_cluster_label : The label of the most numerous cluster (or None if not exist)
    """
    
    largest_clusters = Counter([label for label in labels if label > -1]).most_common(1)
    if len(largest_clusters) > 0 and largest_clusters[0][1] >= min_count:
        return largest_clusters[0][0]
    else:
        return None

In [None]:
with open(gallery_names_filepath, 'r') as reader:
    lines = reader.read().splitlines()
    for line in lines:
        print(line)
        output_folder_name = line.replace(" ", "_")
        output_folder_path = os.path.join(output_root_folder_path, output_folder_name)
        tmp_output_folder_path = os.path.join(output_folder_path, "tmp")
        Path(tmp_output_folder_path).mkdir(parents=True, exist_ok=True)
        query = '"%s"' % line
        image_urls = get_urls(query, max_urls=MAX_WEB_SEARCH_URLS)
#         print(image_urls)
        candidate_gallery_faces = get_faces(image_urls, max_detected_faces=MAX_PROCESSED_FACES_IN_IMAGE, folder=output_folder_path)
        candidate_gallery_embeddings = [e["embedding"] for e in candidate_gallery_faces]
        candidate_gallery_cluster_labels = get_clusters(candidate_gallery_embeddings)
#         print("candidate_gallery_cluster_labels: %s" % candidate_gallery_cluster_labels)
        largest_cluster_label = get_top_cluster(labels=candidate_gallery_cluster_labels, min_count=MIN_SELECTED_GALLERY_FACES)
        selected_gallery_faces = []
        if largest_cluster_label is not None:
#             print("largest_cluster_label: %d" % largest_cluster_label)
            selected_gallery_faces = [candidate_gallery_faces[idx] for idx,label in enumerate(candidate_gallery_cluster_labels) if label == largest_cluster_label]
        output_file_path = os.path.join(output_root_folder_path, output_folder_name, "gallery.csv")
        with open(output_file_path, "w") as writer:
            for selected_gallery_face in selected_gallery_faces:
                image_file_path = os.path.join(output_folder_path, "tmp", selected_gallery_face["filename"])
                if os.path.exists(image_file_path):
                    shutil.move(image_file_path, output_folder_path)
                writer.write("%s %s %s\n" % (selected_gallery_face["filename"], selected_gallery_face["uri"], selected_gallery_face["bbox"]))
                print("%s %s %s" % (selected_gallery_face["filename"], selected_gallery_face["uri"], selected_gallery_face["bbox"]))
        shutil.rmtree(tmp_output_folder_path)
        time.sleep(1)
#         break