# TL:DR

Here is testing of the code in a separate environment to extract images out of the ```zip``` archive and compress the images with their save in a separate ```zip``` archive with the same name

In [1]:
import zipfile
import tempfile
import os
from PIL import Image
import mediapipe as mp
import cv2
from mediapipe.python.solutions.drawing_utils import _normalized_to_pixel_coordinates
import matplotlib.pyplot as plt

# Initialize Mediapipe face detection module
mp_face_detection = mp.solutions.face_detection
face_detection = mp_face_detection.FaceDetection()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


#   Unzip and handle temporary folder

The idea is that we don't want to keep all files in the RAM. Therefore, we create temporary folder for making transformations, form new ```zip``` archive out of changed data and then remove this temporary folder.

In [2]:
def extract_zip_to_temp(archive_name: str, temp_directory: str="temp"):
    """extract all images out of the zip archive into temporary directory.
    IMPORTANT: consider that by default data is saved into 'temp' directory.

    Args:
        archive_name (str): name of archive to decompress
        temp_directory (str): name of directory where to save images
    """
    with zipfile.ZipFile(archive_name, "r") as zip_ref:
        for file in zip_ref.namelist():
            if file.endswith((".png", ".jpg", ".jpeg", ".gif")):
                zip_ref.extract(file, path=temp_directory)
    zip_ref.close()
    

def get_image_filepaths(target_directory: str) -> list:
    """get paths to all images in the directory

    Args:
        target_directory (str): directory where images are stored

    Returns:
        list: paths to images
    """
    image_filepaths = []
    for root, dirs, files in os.walk(target_directory):
        for file in files:
            if file.endswith((".png", ".jpg", ".jpeg", ".gif")):
                image_filepaths.append(os.path.join(root, file))
    return image_filepaths

def remove_temp_directory(temporary_directory: str):
    """remove temporary directory used to processing calculations

    Args:
        temporary_directory (str): name of directory to remove
    """
    for root, dirs, files in os.walk(temporary_directory, topdown=False):
        for file in files:
            os.remove(os.path.join(root, file))
        for dir in dirs:
            os.rmdir(os.path.join(root, dir))
    os.rmdir("temp")

#   Cropping faces out of the image

First stage of reducing image size is to crop images in a way, where we keep only faces, removing all unrelated and non-required data. For this will be used ```mediapipe``` lib with ```FaceDetection``` module.

Here will be also reviewed difference between image size in the original and result that will be achieved. Also, consider that in average around 10% of the records can be lost because mediapipe has possibility of not finding the face

In [3]:
extract_zip_to_temp("images.zip", temp_directory="temp")

In [4]:
images_to_process = get_image_filepaths("temp")

In [5]:
len(images_to_process)

100

Results demonstrated that the best efficiency is in case of cropping image and saving it as ```JPEG```. In case of making color palette there is no option of saving it as ```PNG```. The same goes for image quantization.

In [7]:
processed_images = 0
for image_path in images_to_process:
    offset = 0.2
    
    #   first, we read image using openCV and convert it from BGR to RGB, because
    # CV reads image as BGR
    input_img = cv2.imread(image_path)
    input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB)
    
    #   then we pass image to the face detector and crop image only if faces are found
    results = face_detection.process(input_img)
    if results.detections is not None:
        processed_images += 1
        for i, detection in enumerate(results.detections):
            #   mediapipe marks face using "relative" coordinates, meaning that it's
            # required to multiply relative coordinates with image shape. Here we detect
            # X and Y coordinates of the initial point from which face starts and then width
            # and height of the face in pixels
            bbox = detection.location_data.relative_bounding_box
            x, y, w, h = int(bbox.xmin * input_img.shape[1]), int(bbox.ymin * input_img.shape[0]), \
                        int(bbox.width * input_img.shape[1]), int(bbox.height * input_img.shape[0])
            
            #   now, there is an offset to add
            x_offset, y_offset = int(w * offset), int(h * offset)
            
            #   considering face start point, its width and height, offsets to apply -
            # calculate start point, end point and make sure that it's not going out
            # of the image size
            x = x - x_offset if x_offset < x else 0
            y = y - y_offset if y_offset < y else 0
            x_end = x + w + 2 * x_offset
            y_end = y + h + 2 * y_offset 
            x_end = x_end if x_end < input_img.shape[1] else input_img.shape[1]
            y_end = y_end if y_end < input_img.shape[0] else input_img.shape[0]
            
            #   extract face and convert face image from cv2 one into Pillow one,
            # considering possible error of having incorrect image mode
            face_img = input_img[y:y_end, x:x_end]           
            face_img = Image.fromarray(face_img)
            if face_img.mode != "RGB":
                face_img = face_img.convert("RGB")
            
            # Save the extracted face as a separate image file
            if not os.path.exists("compressed"):
                os.makedirs("compressed")
            face_img.save(f"{os.getcwd()}/compressed/face_{processed_images}.jpg", "JPEG")
            
            # #   Now, considering that we saved the original face we want to see
            # # if it is possible to compress image even more with minimal data loss.
            # # So, first let's check how indexed version of the image will be smaller.
            # # Convert the image to indexed color mode
            # face_index_color_img = face_img.convert("P", palette=Image.ADAPTIVE)
            # face_index_color_img.save(f"{image_path}_face_index_{i}.png", "PNG")
            
            # #   The last check is to make quantized image, where colors will be reduced.
            # # Therefore, less colors will mean smaller size, but colors won't be as accurate
            # # as in the original image.
            # face_index_color_quantized_img = face_img.quantize(colors=256)
            # face_index_color_quantized_img.save(f"{image_path}_face_index_quantized_{i}.png", "PNG")
            
remove_temp_directory("temp")

In [9]:
with zipfile.ZipFile("test_name.zip", "w", zipfile.ZIP_DEFLATED) as zip_file:
    for root, dirs, files in os.walk("compressed"):
        for file in files:
            file_path = os.path.join(root, file)
            zip_file.write(file_path, os.path.relpath(file_path, "compressed"))

#   Making final function

Idea is to take zip archive, extract its content, compress content and remove unnecessary data, then form a new archive with fixed package structure and the same name of archive with ```_compressed``` flag in the end of archive name.

In [1]:
import zipfile
import tempfile
import os
from PIL import Image
import mediapipe as mp
import cv2
import matplotlib.pyplot as plt

# Initialize Mediapipe face detection module
mp_face_detection = mp.solutions.face_detection
face_detection = mp_face_detection.FaceDetection()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [5]:
def extract_zip_to_temp(archive_name: str, temp_directory: str="temp"):
    """extract all images out of the zip archive into temporary directory.
    IMPORTANT: consider that by default data is saved into 'temp' directory.

    Args:
        archive_name (str): name of archive to decompress
        temp_directory (str): name of directory where to save images
    """
    with zipfile.ZipFile(archive_name, "r") as zip_ref:
        for file in zip_ref.namelist():
            if file.endswith((".png", ".jpg", ".jpeg", ".gif")):
                zip_ref.extract(file, path=temp_directory)
    zip_ref.close()
    

def get_image_filepaths(target_directory: str) -> list:
    """get paths to all images in the directory

    Args:
        target_directory (str): directory where images are stored

    Returns:
        list: paths to images
    """
    image_filepaths = []
    for root, dirs, files in os.walk(target_directory):
        for file in files:
            if file.endswith((".png", ".jpg", ".jpeg", ".gif")):
                image_filepaths.append(os.path.join(root, file))
    return image_filepaths

def remove_temp_directory(temporary_directory: str):
    """remove temporary directory used to processing calculations

    Args:
        temporary_directory (str): name of directory to remove
    """
    for root, dirs, files in os.walk(temporary_directory, topdown=False):
        for file in files:
            os.remove(os.path.join(root, file))
        for dir in dirs:
            os.rmdir(os.path.join(root, dir))
    os.rmdir(temporary_directory)

In [6]:
def optimize_images_zip_archive(target_archive_path: str, temp_dir_name: str,
                                compressed_temp_dir_name: str):
    """Take zip archive from given filename or path in 'target_archive_path',
    extract images out of it into temporary directory that will be created with
    'temp_dir_name' name, then from all images will be taken face and transformed
    into JPEG format. Those optimized images are saved into 'compressed_temp_dir_name'
    directory. At the final stage, will be created archive with the name of original
    one with added '_compressed' flag, after which all temporary folders and 
    original zip are removed 

    Args:
        target_archive_path (str): name or path of archive to optimize
        temp_dir_name (str): name of directory where images will be extracted from
                original archive and that will be removed in the end of process
        compressed_temp_dir_name (str): name of directory where compressed images
                will be saved and that will be removed in the end of process.
    """
    extract_zip_to_temp(target_archive_path, temp_directory=temp_dir_name)
    images_to_process = get_image_filepaths(temp_dir_name)
    
    processed_images = 0
    for image_path in images_to_process:
        offset = 0.2
        
        #   first, we read image using openCV and convert it from BGR to RGB, because
        # CV reads image as BGR
        input_img = cv2.imread(image_path)
        input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB)
        
        #   then we pass image to the face detector and crop image only if faces are found
        results = face_detection.process(input_img)
        if results.detections is not None:
            processed_images += 1
            for i, detection in enumerate(results.detections):
                #   mediapipe marks face using "relative" coordinates, meaning that it's
                # required to multiply relative coordinates with image shape. Here we detect
                # X and Y coordinates of the initial point from which face starts and then width
                # and height of the face in pixels
                bbox = detection.location_data.relative_bounding_box
                x, y, w, h = int(bbox.xmin * input_img.shape[1]), int(bbox.ymin * input_img.shape[0]), \
                            int(bbox.width * input_img.shape[1]), int(bbox.height * input_img.shape[0])
                
                #   now, there is an offset to add
                x_offset, y_offset = int(w * offset), int(h * offset)
                
                #   considering face start point, its width and height, offsets to apply -
                # calculate start point, end point and make sure that it's not going out
                # of the image size
                x = x - x_offset if x_offset < x else 0
                y = y - y_offset if y_offset < y else 0
                x_end = x + w + 2 * x_offset
                y_end = y + h + 2 * y_offset 
                x_end = x_end if x_end < input_img.shape[1] else input_img.shape[1]
                y_end = y_end if y_end < input_img.shape[0] else input_img.shape[0]
                
                #   extract face and convert face image from cv2 one into Pillow one,
                # considering possible error of having incorrect image mode
                face_img = input_img[y:y_end, x:x_end]           
                face_img = Image.fromarray(face_img)
                if face_img.mode != "RGB":
                    face_img = face_img.convert("RGB")
                
                # Save the extracted face as a separate image file
                if not os.path.exists(compressed_temp_dir_name):
                    os.makedirs(compressed_temp_dir_name)
                face_img.save(f"{os.getcwd()}/{compressed_temp_dir_name}/face_{processed_images}.jpg", "JPEG")
    #   remove intermediate directory
    remove_temp_directory(temp_dir_name)
    
    #   we want the name of original archive with flag that it's compressed
    base_name, extension = os.path.splitext(target_archive_path)
    new_base_name = base_name + "_compressed"
    new_file_name = new_base_name + extension
    
    #   make new archive with contents of the directory containing compressed images
    with zipfile.ZipFile(new_file_name, "w", zipfile.ZIP_DEFLATED) as zip_file:
        for root, dirs, files in os.walk(compressed_temp_dir_name):
            for file in files:
                file_path = os.path.join(root, file)
                zip_file.write(file_path, os.path.relpath(file_path, compressed_temp_dir_name))
    
    #   final removal of intermediate directory with compressed image and original zip archive
    remove_temp_directory(compressed_temp_dir_name)
    if os.path.exists(target_archive_path):
        os.remove(target_archive_path)

In [7]:
optimize_images_zip_archive("images.zip", "temporary", "compressed")

# Unexpected continue of work

As it was discovered, Mediapipe runs strangely on a server and is not able to be working (some deal with codecs) but the CV models work properly. Therefore, here we'll concentrate more on making optimized face cropping technique using openCV module.

Generally there are two main models used for face detection:

1. Haar cascade - well it is mostly based on understanding specific shapes and forms to represent the face of a person, what type of lines for human face. Computationally intensive and requires a lot of time
2. LBP classifier - faster, because it is based more on performing analysis of the pixel groups. Less computations and faster, but efficiency is smaller.

Here will be performed test of both detection approaches to see which one will have better coverage and compare speed of execution.

# Haar Cascade approach

In [1]:
import zipfile
import tempfile
import os
from PIL import Image
import cv2
import matplotlib.pyplot as plt

#   loading Haar Cascade classifier for face detection
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")


def extract_zip_to_temp(archive_name: str, temp_directory: str="temp"):
    """extract all images out of the zip archive into temporary directory.
    IMPORTANT: consider that by default data is saved into 'temp' directory.

    Args:
        archive_name (str): name of archive to decompress
        temp_directory (str): name of directory where to save images
    """
    with zipfile.ZipFile(archive_name, "r") as zip_ref:
        for file in zip_ref.namelist():
            if file.endswith((".png", ".jpg", ".jpeg", ".gif")):
                zip_ref.extract(file, path=temp_directory)
    zip_ref.close()
    

def get_image_filepaths(target_directory: str) -> list:
    """get paths to all images in the directory

    Args:
        target_directory (str): directory where images are stored

    Returns:
        list: paths to images
    """
    image_filepaths = []
    for root, dirs, files in os.walk(target_directory):
        for file in files:
            if file.endswith((".png", ".jpg", ".jpeg", ".gif")):
                image_filepaths.append(os.path.join(root, file))
    return image_filepaths

def remove_temp_directory(temporary_directory: str):
    """remove temporary directory used to processing calculations

    Args:
        temporary_directory (str): name of directory to remove
    """
    for root, dirs, files in os.walk(temporary_directory, topdown=False):
        for file in files:
            os.remove(os.path.join(root, file))
        for dir in dirs:
            os.rmdir(os.path.join(root, dir))
    os.rmdir(temporary_directory)

In [6]:
def optimize_images_zip_archive(target_archive_path: str, temp_dir_name: str,
                                compressed_temp_dir_name: str):
    """Take zip archive from given filename or path in 'target_archive_path',
    extract images out of it into temporary directory that will be created with
    'temp_dir_name' name, then from all images will be taken face and transformed
    into JPEG format. Those optimized images are saved into 'compressed_temp_dir_name'
    directory. At the final stage, will be created archive with the name of original
    one with added '_compressed' flag, after which all temporary folders and 
    original zip are removed 

    Args:
        target_archive_path (str): name or path of archive to optimize
        temp_dir_name (str): name of directory where images will be extracted from
                original archive and that will be removed in the end of process
        compressed_temp_dir_name (str): name of directory where compressed images
                will be saved and that will be removed in the end of process.
    """
    extract_zip_to_temp(target_archive_path, temp_directory=temp_dir_name)
    images_to_process = get_image_filepaths(temp_dir_name)
    
    processed_images = 0
    for image_path in images_to_process:
        offset = 0.05
        
        #   first we reduce amount of required calculations by making grayscaled images
        # instead of colored ones
        input_img = cv2.imread(image_path)
        input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2GRAY)
        
        #   now we pass grayscaled image to the Haar Cascade and it will return list with
        # all found face coordinates
        faces = face_cascade.detectMultiScale(input_img, scaleFactor=1.1, minNeighbors=5)
        for (x, y, w, h) in faces:            
            processed_images += 1
            #   considering found face, apply offset to save some additional face elements
            x_offset, y_offset = int(w * offset), int(h * offset)
            
            #   considering face start point, its width and height, offsets to apply -
            # calculate start point, end point and make sure that it's not going out
            # of the image size
            x = x - x_offset if x_offset < x else 0
            y = y - y_offset if y_offset < y else 0
            x_end = x + w + 2 * x_offset
            y_end = y + h + 2 * y_offset 
            x_end = x_end if x_end < input_img.shape[1] else input_img.shape[1]
            y_end = y_end if y_end < input_img.shape[0] else input_img.shape[0]
            
            #   extract face and convert face image from cv2 one into Pillow one,
            # considering possible error of having incorrect image mode
            face_img = input_img[y:y_end, x:x_end]           
            face_img = Image.fromarray(face_img)
            if face_img.mode != "RGB":
                face_img = face_img.convert("RGB")
            
            # Save the extracted face as a separate image file
            if not os.path.exists(compressed_temp_dir_name):
                os.makedirs(compressed_temp_dir_name)
            face_img.save(f"{os.getcwd()}/{compressed_temp_dir_name}/face_{processed_images}.jpg", "JPEG")
    # #   remove intermediate directory
    # remove_temp_directory(temp_dir_name)
    
    # #   we want the name of original archive with flag that it's compressed
    # base_name, extension = os.path.splitext(target_archive_path)
    # new_base_name = base_name + "_compressed"
    # new_file_name = new_base_name + extension
    
    # #   make new archive with contents of the directory containing compressed images
    # with zipfile.ZipFile(new_file_name, "w", zipfile.ZIP_DEFLATED) as zip_file:
    #     for root, dirs, files in os.walk(compressed_temp_dir_name):
    #         for file in files:
    #             file_path = os.path.join(root, file)
    #             zip_file.write(file_path, os.path.relpath(file_path, compressed_temp_dir_name))
    
    # #   final removal of intermediate directory with compressed image and original zip archive
    # remove_temp_directory(compressed_temp_dir_name)
    # if os.path.exists(target_archive_path):
    #     os.remove(target_archive_path)
    
    
optimize_images_zip_archive("images.zip", "temporary", "compressed")

Result - out of 100 images with human face we got 102 faces, meaning that we got 2 false identification of the human face. Overall - great performance, but speed is around 4.5 seconds without removal of the temp folders and original zip.

# LBP cascade

In [2]:
import zipfile
import tempfile
import os
from PIL import Image
import cv2
import matplotlib.pyplot as plt

#   loading LBP Cascade classifier for face detection
face_cascade = cv2.CascadeClassifier("data/lbpcascade_frontalface.xml")


def extract_zip_to_temp(archive_name: str, temp_directory: str="temp"):
    """extract all images out of the zip archive into temporary directory.
    IMPORTANT: consider that by default data is saved into 'temp' directory.

    Args:
        archive_name (str): name of archive to decompress
        temp_directory (str): name of directory where to save images
    """
    with zipfile.ZipFile(archive_name, "r") as zip_ref:
        for file in zip_ref.namelist():
            if file.endswith((".png", ".jpg", ".jpeg", ".gif")):
                zip_ref.extract(file, path=temp_directory)
    zip_ref.close()
    

def get_image_filepaths(target_directory: str) -> list:
    """get paths to all images in the directory

    Args:
        target_directory (str): directory where images are stored

    Returns:
        list: paths to images
    """
    image_filepaths = []
    for root, dirs, files in os.walk(target_directory):
        for file in files:
            if file.endswith((".png", ".jpg", ".jpeg", ".gif")):
                image_filepaths.append(os.path.join(root, file))
    return image_filepaths

def remove_temp_directory(temporary_directory: str):
    """remove temporary directory used to processing calculations

    Args:
        temporary_directory (str): name of directory to remove
    """
    for root, dirs, files in os.walk(temporary_directory, topdown=False):
        for file in files:
            os.remove(os.path.join(root, file))
        for dir in dirs:
            os.rmdir(os.path.join(root, dir))
    os.rmdir(temporary_directory)

In [5]:
def optimize_images_zip_archive(target_archive_path: str, temp_dir_name: str,
                                compressed_temp_dir_name: str):
    """Take zip archive from given filename or path in 'target_archive_path',
    extract images out of it into temporary directory that will be created with
    'temp_dir_name' name, then from all images will be taken face and transformed
    into JPEG format. Those optimized images are saved into 'compressed_temp_dir_name'
    directory. At the final stage, will be created archive with the name of original
    one with added '_compressed' flag, after which all temporary folders and 
    original zip are removed 

    Args:
        target_archive_path (str): name or path of archive to optimize
        temp_dir_name (str): name of directory where images will be extracted from
                original archive and that will be removed in the end of process
        compressed_temp_dir_name (str): name of directory where compressed images
                will be saved and that will be removed in the end of process.
    """
    extract_zip_to_temp(target_archive_path, temp_directory=temp_dir_name)
    images_to_process = get_image_filepaths(temp_dir_name)
    
    processed_images = 0
    for image_path in images_to_process:
        offset = 0.05
        
        #   first we reduce amount of required calculations by making grayscaled images
        # instead of colored ones
        input_img = cv2.imread(image_path)
        input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2GRAY)
        
        #   now we pass grayscaled image to the Haar Cascade and it will return list with
        # all found face coordinates
        faces = face_cascade.detectMultiScale(input_img, scaleFactor=1.1, minNeighbors=10)
        for (x, y, w, h) in faces:            
            processed_images += 1
            #   considering found face, apply offset to save some additional face elements
            x_offset, y_offset = int(w * offset), int(h * offset)
            
            #   considering face start point, its width and height, offsets to apply -
            # calculate start point, end point and make sure that it's not going out
            # of the image size
            x = x - x_offset if x_offset < x else 0
            y = y - y_offset if y_offset < y else 0
            x_end = x + w + 2 * x_offset
            y_end = y + h + 2 * y_offset 
            x_end = x_end if x_end < input_img.shape[1] else input_img.shape[1]
            y_end = y_end if y_end < input_img.shape[0] else input_img.shape[0]
            
            #   extract face and convert face image from cv2 one into Pillow one,
            # considering possible error of having incorrect image mode
            face_img = input_img[y:y_end, x:x_end]           
            face_img = Image.fromarray(face_img)
            if face_img.mode != "RGB":
                face_img = face_img.convert("RGB")
            
            # Save the extracted face as a separate image file
            if not os.path.exists(compressed_temp_dir_name):
                os.makedirs(compressed_temp_dir_name)
            face_img.save(f"{os.getcwd()}/{compressed_temp_dir_name}/face_{processed_images}.jpg", "JPEG")
    #   remove intermediate directory
    remove_temp_directory(temp_dir_name)
    
    #   we want the name of original archive with flag that it's compressed
    base_name, extension = os.path.splitext(target_archive_path)
    new_base_name = base_name + "_compressed"
    new_file_name = new_base_name + extension
    
    #   make new archive with contents of the directory containing compressed images
    with zipfile.ZipFile(new_file_name, "w", zipfile.ZIP_DEFLATED) as zip_file:
        for root, dirs, files in os.walk(compressed_temp_dir_name):
            for file in files:
                file_path = os.path.join(root, file)
                zip_file.write(file_path, os.path.relpath(file_path, compressed_temp_dir_name))
    
    #   final removal of intermediate directory with compressed image and original zip archive
    remove_temp_directory(compressed_temp_dir_name)
    if os.path.exists(target_archive_path):
        os.remove(target_archive_path)
    
    
optimize_images_zip_archive("images.zip", "temporary", "compressed")

LBP classifier performs much better in terms of the speed compared to Haar Cascade. LBP dealed with image compression and face extraction in 2.5-2.7 seconds with making zip and removing intermediate folders. It was required to play a bit with amount of pixels on which it behaves rationally (like not detecting face in some shadow). In result - 10 neighbours is enough for it to work nice.