<a href="https://colab.research.google.com/github/azroddin123/IBM-Demo/blob/master/filter_low_sized_faces_using_face_box_results.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# progressbar -- multiprocess
!pip install tqdm==4.48.0



In [None]:
import os
import shutil
from pathlib import Path

from operator import itemgetter

# from tqdm import tqdm  # progess bar @# console
from tqdm.notebook import tqdm
from tqdm.contrib.concurrent import process_map # multi-process - tqdm>=4.42.0


In [None]:
def list_files(root_dir, mindepth = 1, maxdepth = float('inf'), filter_ext=[], return_relative_path=False):
    """
    Usage:

    d = get_all_files(rootdir, mindepth = 1, maxdepth = 2)

    This returns a list of all files of a directory, including all files in
    subdirectories. Full paths are returned.

    WARNING: this may create a very large list if many files exists in the 
    directory and subdirectories. Make sure you set the maxdepth appropriately.

    rootdir  = existing directory to start
    mindepth = int: the level to start, 1 is start at root dir, 2 is start 
               at the sub direcories of the root dir, and-so-on-so-forth.
    maxdepth = int: the level which to report to. Example, if you only want 
               in the files of the sub directories of the root dir, 
               set mindepth = 2 and maxdepth = 2. If you only want the files
               of the root dir itself, set mindepth = 1 and maxdepth = 1
    
    filter_ext(list, optional) :  filter files ex. [.jpg, .png]
    return_relative_path(bool): Default false. If true return relative path else return absolute path
    """
    root_dir = os.path.normcase(root_dir)
    file_paths = []
    root_depth = root_dir.rstrip(os.path.sep).count(os.path.sep) - 1
    lowered_filter_ext = tuple([ext.lower() for ext in filter_ext])

    for abs_dir, dirs, files in os.walk(root_dir):
        depth = abs_dir.count(os.path.sep) - root_depth
        if mindepth <= depth <= maxdepth:
            for filename in files:
                if filter_ext:
                    if not filename.lower().endswith(lowered_filter_ext):
                        continue

                if return_relative_path:
                    rel_dir = os.path.relpath(abs_dir, root_dir)
                    if rel_dir == ".":
                        file_paths.append(filename)
                    else:
                        file_paths.append(os.path.join(rel_dir, filename))
                else:
                    # append full absolute path
                    file_paths.append(os.path.join(abs_dir, filename))

        elif depth > maxdepth:
            # del dirs[:] 
            pass
    return file_paths

In [None]:
def remove_img_and_face_result_txt(image_file, face_detection_result_txt_file):
    try:
        os.remove(image_file)
        is_image_deleted = True
    except OSError:
        is_image_deleted = False
        pass

    try:
        os.remove(face_detection_result_txt_file)
        is_txt_deleted = True
    except OSError:
        is_txt_deleted = False
        pass
    return (is_image_deleted, is_txt_deleted)


def filter_singlefile(params):
    source_dir = params["source_dir"]
    file_rel_path = params["file_rel_path"]
    min_width = params["min_width"]
    min_height = params["min_height"]
    silent_on_missing_wrong_facebox = params["silent_on_missing_wrong_facebox"]

    sourcefile_abs_path = os.path.join(source_dir, file_rel_path)

    log = {
        "sourcefile": file_rel_path,
    }

    # ensure is face detction result exists for file
    src_file_name, src_file_extension = os.path.splitext(sourcefile_abs_path)
    src_txt_filename = f"{src_file_name}_facebox_retina-mobilenet.txt"
    
    if os.path.exists(src_txt_filename):
        try:
            # read from txt file -- if face deetction done already
            f = open(src_txt_filename, "r")
            lines = f.read().splitlines()
            f.close()

            if not lines:
                return False
      
            detection = lines[0].split(" ")
            face_box = [int(detection[0]), int(detection[1]), int(detection[2]), int(detection[3])]
            confidence = float(detection[4])

            # get face cordinates
            x, y, w, h = face_box[0], face_box[1], face_box[2], face_box[3]
            
            if w < min_width:
                # remove file if silent not false
                remove_img_and_face_result_txt(sourcefile_abs_path, src_txt_filename)
                log.update({
                    "code": 100,
                    "info": f"face width {w} less than thresold. files removed."
                })

            elif h < min_height:
                # remove file
                remove_img_and_face_result_txt(sourcefile_abs_path, src_txt_filename)
                log.update({
                    "code": 101,
                    "info": f"face height {h} less than thresold. files removed."
                })
            else:
                pass
                
        except Exception as e:
            if not silent_on_missing_wrong_facebox:
                #  remove files -- image and face box result txt
                remove_img_and_face_result_txt(sourcefile_abs_path, src_txt_filename)
                log.update({
                    "code": 102,
                    "info": f"Error reading facebox."
                })
    else:
        if not silent_on_missing_wrong_facebox:
            remove_img_and_face_result_txt(sourcefile_abs_path, src_txt_filename)
                
            log.update({
                "code": 103,
                "info": f"Facebox file not found. Associated Image removed."
            })
    return log


def filter_small_faces_fulldir_multiprocess(
    source_dir,
    min_width=120, min_height=120, silent_on_missing_wrong_facebox=False,
    multiprocess=True, max_workers=5,
    ):
    """
    Filter directory and remove small sized face images
    Args:
        source_dir (str): source directory
        min_width (int, optional): Threshold to filter low sized faces. Defaults to 120.
        min_height (int, optional): Width threshold to filter low sized faces. Defaults to 120.
        silent_on_missing_wrong_facebox (bool, optional): If true then wrong face detection results (or if missing facebox) and associated image files will not be removed. Defaults to False.
        multiprocess (bool, optional): Multiprocessing. Defaults to True.
        max_workers (int, optional): Workers for multiprocessing. Defaults to 5.
    """
    # list all files
    lst_files = list_files(source_dir, filter_ext=[".jpg", ".jpeg", ".png"], return_relative_path=True)

    lst_params = []
    for file_rel_path in lst_files:
        lst_params.append({
            "source_dir": source_dir,
            "file_rel_path": file_rel_path,
            "min_width": min_width,
            "min_height": min_height,
            "silent_on_missing_wrong_facebox": silent_on_missing_wrong_facebox,
        })

    # map multiple tasks
    result = process_map(filter_singlefile, lst_params , max_workers=10)
    return result

## *Start*

In [None]:
### with scaling -- loose face cropping
source_dir  = "/content/filter-faces"

result = filter_small_faces_fulldir_multiprocess(
    source_dir,
    min_width=120, 
    min_height=120, 
    silent_on_missing_wrong_facebox=False,
    multiprocess=True, max_workers=5,
)

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))




In [None]:
total_removed_files_cnt = len([k['code'] for k in result if k.get('code')])
print(f"Total removed files count: {total_removed_files_cnt}")

Total removed files count: 1
