In [1]:
import os
import shutil
from collections import defaultdict
from typing import Tuple, List, NoReturn, DefaultDict, Set, Dict

import uuid
import faiss
import imagehash
import numpy as np
from PIL import Image
from faiss import IndexBinaryFlat

In [2]:
class ImageCompactor:
    def __init__(self, hash_size: int, image_distance_threshold: int):
        self._hash_size: int = hash_size
        self._distance_threshold: int = image_distance_threshold
        self._index: IndexBinaryFlat = self._load_index()

    def compact_images(self, path_to_folder: str) -> NoReturn:
        photos_names: List[str] = list(map(lambda x: os.path.join(path_to_folder, x), os.listdir(path_to_folder)))
        hashes: List[np.ndarray] = self._fill_index(photos_names)
        buckets: DefaultDict[int, Set[str]] = self._select_sets(hashes, photos_names)
        self._perform_compaction(buckets, path_to_folder)

    def _fill_index(self, photo_names: List[str]) -> List[np.ndarray]:
        hashes: List[np.ndarray] = []
        for name in photo_names:
            file = Image.open(name)
            image_hash = self._hash_image(file)
            self._index.add(image_hash)
            hashes.append(image_hash)
        return hashes

    def _select_sets(self, hashes: List[np.ndarray], photos_names: List[str]) -> DefaultDict[int, Set[str]]:
        used_images: Dict[int, int] = dict()
        batches: DefaultDict[int, Set[str]] = defaultdict(lambda: set())
        for img_hash in hashes:
            s: List[Tuple[int, int]] = self._check_duplicate(img_hash)
            current_image_index: int = s[0][0]
            bucket_images: List[int] = list(
                map(lambda l: l[0], filter(lambda x: x[0] not in used_images and x[1] < self._distance_threshold, s)))
            lst = map(lambda l: photos_names[l], bucket_images)
            batches[used_images.get(current_image_index, current_image_index)].update(lst)
            used_images.update([(k, current_image_index) for k in bucket_images])
        return batches

    @staticmethod
    def _perform_compaction(buckets: DefaultDict[int, Set[str]], path_to_folder: str):
        for batch_num, batch in buckets.items():
            if len(batch) > 1:
                new_folder = os.path.join(path_to_folder, uuid.uuid4().hex)
                os.mkdir(new_folder)
                for file_path in batch:
                    file_name = file_path.rsplit('/')[-1]
                    shutil.move(file_path, os.path.join(new_folder, file_name))

    def _hash_image(self, im: Image) -> np.ndarray:
        im_hash: imagehash.ImageHash = imagehash.average_hash(im, hash_size=self._hash_size)
        return np.packbits(np.array(im_hash.hash).reshape(1, self._hash_size ** 2), axis=1)

    def _check_duplicate(self, img_hash: np.ndarray) -> List[Tuple[int, int]]:
        D, I = self._index.search(img_hash, self._distance_threshold)
        return list(zip(I[0], D[0]))

    def _load_index(self, filename: str = 'faiss_index') -> IndexBinaryFlat:
        d: int = self._hash_size ** 2
        try:
            return faiss.read_index_binary(f'{filename}_{d}')
        except RuntimeError:
            return faiss.IndexBinaryFlat(d)

    def save_index(self, filename: str = 'faiss_index') -> None:
        d: int = self._hash_size ** 2
        faiss.write_index_binary(self._index, f'{filename}_{d}')

In [3]:
compactor: ImageCompactor = ImageCompactor(hash_size=16, image_distance_threshold=65)
compactor.compact_images("images/AlgorithmTest/")