# Library Import

In [None]:
!pip install deepface tf-keras matplotlib

## Library Install

In [2]:
from deepface import DeepFace
from retinaface import RetinaFace

In [3]:
import cv2
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw
import glob

In [32]:
import json
import random

Use Google Colab

In [5]:
from google.colab import drive

drive.mount('/content/gdrive')
ROOT_DIR = "/content/gdrive/MyDrive/Colab Notebooks/FR_Project/"

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


# Main Logic

## File structure

In [6]:
from pathlib import Path
import shutil


def mkdir(p):
  Path(p).mkdir(parents=True, exist_ok=True)


In [7]:
SOURCE_DIR = f"{ROOT_DIR}Raw_Image_Source"
PROCESSED_IMAGE_TXT = f"Processed.txt"

score_index_list = {}
non_face_list = []

In [8]:
DRIVE_OUTPUT = False

if DRIVE_OUTPUT:
  FACES_INDEX = f"{ROOT_DIR}Faces_Index"
  FACES_DIR = f"{ROOT_DIR}Faces"
  SCORE_INDEX_LIST_PATH = f"{FACES_INDEX}/score_index_list.json"
else:
  FACES_INDEX = f"Faces_Index"
  FACES_DIR = f"Faces"
  SCORE_INDEX_LIST_PATH = f"{FACES_INDEX}/score_index_list.json"

NON_FACES_LIST_PATH = f"{ROOT_DIR}Non_Faces.txt"
# TEMP_DIR = "Temp"
mkdir(FACES_INDEX)
mkdir(FACES_DIR)
# mkdir(TEMP_DIR)

def write_processed(img_path):
  with open(PROCESSED_IMAGE_TXT, "a+") as txt_file:
    txt_file.writelines(img_path + "\n")


Init JSON score index list

In [9]:
def init_score_index():
  if not Path(SCORE_INDEX_LIST_PATH).is_file():
    score_index_list = {}
    with open(SCORE_INDEX_LIST_PATH, 'w') as f:
          json.dump({}, f)

  with open(SCORE_INDEX_LIST_PATH, 'r+') as f:
      score_index_list = json.load(f)

In [10]:
def init_dummy():
  img = Image.new('RGB',(480,640),"rgb(255,255,255)")
  img.save(f"{FACES_INDEX}/dummy.jpg")

In [11]:
def read_nonface_list():
  if not Path(NON_FACES_LIST_PATH).is_file():
    non_face_list = []
  else:
    with open(NON_FACES_LIST_PATH, "r") as f:
      non_face_list = [l.replace("\n", "") for l in f.readlines()]


def write_to_nonface_list(img_path):
  global non_face_list
  if not Path(NON_FACES_LIST_PATH).is_file():
    non_face_list = []

  with open(NON_FACES_LIST_PATH, "a+") as f:
    f.writelines(img_path + "\n")

  non_face_list.append(img_path)

## **Function:** Image view with PIL

drawImageWithPlot( image_Path, param, save_crop=False, *, show_img=False )

---

> param: {
  "face_area_1": {
    facial_area: [x, y, w, h],
    ...
  },
  "face_area_2": {
    facial_area: [x, y, w, h],
    ...
  }, ...
}

In [12]:
def plt_show_img(imgs: list) -> None:
  imgs_count = len(imgs)
  for i in range(imgs_count):
    ax = plt.subplot(1, imgs_count, i+1)
    ax.imshow(imgs[i])
    ax.set_xticks([])
    ax.set_yticks([])
  plt.tight_layout(h_pad=3)
  plt.show()

def drawImageWithPlot(img_path, param, save_crop=False, *, show_img=False):
  im = Image.open(img_path)
  crop_array = []
  crop_len = len(crop_array)
  crop_count = 0

  draw_img_grid = []

  for key in param:
    rect = param[key]['facial_area']
    x, y, w, h = rect

    if save_crop > 0:
      im_crop = im.crop((x-100, y-100, w+100, h+100))
      save_path = crop_array[crop_count] if crop_count < crop_len else f"{FACES_DIR}/saved_crop_{crop_count}.jpg"
      im_crop.save(save_path)
      crop_count = crop_count + 1

    border_width = int(min(w-x, h-y) / 15)
    border_width = max(border_width, 10)
    border_width = min(border_width, 20)

    draw_img_grid.append((x, y, w, h, border_width))
    # draw = ImageDraw.Draw(im)
    # draw.rectangle((x, y, w, h), outline="red", width=border_width)

  if show_img:
    draw = ImageDraw.Draw(im)
    for x, y, w, h, border_width in draw_img_grid:
      draw.rectangle((x, y, w, h), outline="red", width=border_width)
    plt_show_img([im])

## Function: Save Crop Image as Face_index

In [13]:
# Model List
# models_name = ["VGG-Face", "Facenet", "Facenet512", "OpenFace",
#         "DeepID", "ArcFace", "SFace"]
models_name = ["Facenet512", "OpenFace", "DeepID", "SFace"]

In [14]:
def gen_next_img_full_path():
  i = 1
  while True:
    if f"{FACES_INDEX}/person{i}.jpg" not in score_index_list:
      return f"{FACES_INDEX}/person{i}.jpg"
    i = i + 1

In [15]:
def save_face(face_index, img_path, crop_img = ''):
  face_path = f'{FACES_DIR}/{face_index}'
  mkdir(face_path)
  copied_path = shutil.copy(img_path, face_path)

  if crop_img:
    shutil.copy(crop_img, f"{copied_path[:-4]}_Crop.jpg")


In [28]:
# True: replace
# False: index no change
def check_new_index_file(img_name, score) -> bool:
  if img_name not in score_index_list:
    return True
  curr_score = score_index_list[img_name]
  return curr_score < score

def check_face_index(img_path, param, *, slient = True):
  def print_log(*arg):
    if not slient: print(arg)

  im = Image.open(img_path)
  found_person = [f'{FACES_INDEX}/dummy.jpg'] # two person will not appear in same image
  image_valid = False

  for key in param:
    score = param[key]['score']
    rect = param[key]['facial_area']
    x, y, w, h = rect
    total_width = w - x
    total_height = h - y

    im_crop = im.crop((x - total_width*0.3, y - total_height*0.4, w + total_width*0.3, h + total_height*0.4))

    # filter too small images
    if w - x <= 64 or h - y <= 64:
      print_log(f"image too small: {rect}")
      continue

    # filter score too low?
    if score < 0.96:
      print_log(f"score too low: {score}")
      continue

    save_path = f"Crop_Image_Temp.jpg"
    im_crop.save(save_path)

    match_res = []
    match_model = []
    result = DeepFace.find(img_path=save_path, db_path=FACES_INDEX, enforce_detection=False, silent=True)
    for res in result:
      if res.empty: continue
      r = res["identity"][0]
      if r not in match_res:
        match_res.append(r)
      match_model.append(x)

    print_log(f"Image Score: {score}")
    print_log(f"Matched: {match_res}")
    print_log(f"Reported by: {match_model}")
    found_img_full_path = ''
    found_img_distance = 1.0

    for r in match_res:
      if r in found_person: continue
      obj = DeepFace.verify(img1_path = r, img2_path = save_path,
          model_name="Facenet512" ,detector_backend = 'retinaface',
          enforce_detection=False, threshold=0.6)
      print_log(obj)
      if obj["verified"] and obj["distance"] < found_img_distance:
        found_img_full_path = r

    if not found_img_full_path:
      found_img_full_path = gen_next_img_full_path()
    face_key = found_img_full_path.split('/')[-1].replace(".jpg", '')

    if check_new_index_file(found_img_full_path, score):
      if found_img_full_path in score_index_list:
        print_log(f"Replacing {found_img_full_path} from {score_index_list[found_img_full_path]} -> {score}")
      else:
        print_log(f"New Person: {found_img_full_path} with score {score}")

      shutil.copy(save_path, f"{found_img_full_path}")
      score_index_list[found_img_full_path] = score
      with open(SCORE_INDEX_LIST_PATH, 'w') as f:
        json.dump(score_index_list, f)

      Path.unlink(f'{FACES_INDEX}/dummy.jpg', True)

    save_face(face_key, img_path, save_path)
    Path.unlink(save_path, True)
    found_person.append(found_img_full_path)
    image_valid = True
    print_log()

  return image_valid

## Image Analyze

In [33]:
def reset_file_structure():
  global score_index_list

  shutil.rmtree(FACES_DIR, ignore_errors=True)
  score_index_list = {}
  index_list = glob.glob(f"{FACES_INDEX}/*.jpg")
  for jpg in index_list:
    Path.unlink(jpg)
  Path.unlink(SCORE_INDEX_LIST_PATH, missing_ok=True)
  Path.unlink(PROCESSED_IMAGE_TXT, missing_ok=True)

  mkdir(FACES_DIR)

reset_file_structure()

In [34]:
init_score_index()
init_dummy()
image_list = glob.glob(f"{SOURCE_DIR}/*.jpg")

if Path(NON_FACES_LIST_PATH).is_file():
  with open(NON_FACES_LIST_PATH, "r") as txt_file:
    lines = txt_file.readlines()

  for x in lines:
    image_list.remove(x.replace("\n", ''))


Path(PROCESSED_IMAGE_TXT).touch(exist_ok=True)
with open(PROCESSED_IMAGE_TXT, "r") as txt_file:
  lines = txt_file.readlines()

for x in lines:
  try:
    image_list.remove(x.replace("\n", ''))
  except:
    pass

random.shuffle(image_list)

In [None]:
progress_count = 0
total = len(image_list)
for img_path in image_list:
  resp = RetinaFace.detect_faces(img_path)

  write_processed(img_path)
  image_list.remove(img_path)
  # drawImageWithPlot(img_path, resp, True, False)
  if resp:
    # if check_face_index(img_path, resp):
    #   break
    check_face_index(img_path, resp)
    progress_count = progress_count + 1
  else:
    write_to_nonface_list(img_path)
    total = total - 1

  print(f"Progress: {progress_count} / {total}")

  if progress_count >= 50:
    break

In [None]:
drawImageWithPlot(img_path, resp, False, show_img=True)

# Testing Area

In [21]:
# face_objs = DeepFace.extract_faces(
#   img_path = "test.jpg",
#   detector_backend = "dlib",
#   align = True,
# )
# face_objs

In [22]:
# img = cv2.imread('test.jpg')
# plt.imshow(img[:, :, ::-1])

In [23]:
# type(face_objs)

In [24]:
# analyze one image
# img_path = "test2.jpg"
# resp = RetinaFace.detect_faces(img_path)

In [25]:
# output image
# drawImageWithPlot(img_path, resp, True)

In [26]:
# result = DeepFace.find(img_path="saved_crop_0.jpg", db_path="find_image", model_name="VGG-Face", enforce_detection=False, silent=True)

In [27]:
# print(result)