In [1]:
"""Для работы core необходимо в Open Workspace Settings (JSON) вставить строку "python.analysis.extraPaths": ["${workspaceFolder}/utils/"]
В скриптах, где будет использоваться core необходимо перед импортом добавлять строки:
import sys
import os
sys.path.append(os.path.join(os.getcwd(), "utils"))

"""

import os

IMAGE_EXTENSIONS = {'.tiff', '.tif', '.jpeg', '.jpg', '.jpe', '.bmp', '.png'}

def is_image_file(filename:str):
    """
    Проверяем, что файл является изображением. 
    В данный момент протестированы только данные форматы файлов изображений.
    :param filename: полный путь к файлу
    :return: True - если файл является изображением
    """
    _, file_extension = os.path.splitext(filename)
    return file_extension in IMAGE_EXTENSIONS


def calculate_count_file_extensions(folder: str):
    """
    Функция возвращает словарь, где в качестве ключа выступает расширение файла, а в качестве значение количество файлов с таким расширением
    :param folder: папка, для которой будут искать все возможные расширения файлов, включая вложенные
    :return: Возвращает словарь, где в качестве ключа выступают расширения, а в качестве значения - количества
    """
    
    exts = dict()
    with os.scandir(folder) as it:
        for entry in it:
            if entry.is_dir():
                sub_exts = calculate_count_file_extensions(entry.path)
                for ext_key in sub_exts:
                    if ext_key not in exts.keys():
                        exts[ext_key] = 0
                    exts[ext_key] += sub_exts[ext_key]
                
            elif entry.is_file():
                ext_key = str.lower(os.path.splitext(entry.name)[1])
                if ext_key not in exts.keys():
                    exts[ext_key] = 0
                exts[ext_key] += 1
    
    return exts     # Возвращаем набор расширений всех файлов в этой папке


def get_count_image_files(folder: str):
    """
    Определяет, сколько файлов изображения содержит папка.
    :param folder: имя папки, для которой будет подсчитано количество файлов, в том числе и во вложенных папках
    :return: Возвращает количество файлов изображений внутри папки, включая и вложенные.
    """
    return len(get_all_image_file_names_in_folder(folder))   # получаем имена файлов изображений и папок, внутри папки folder


def get_all_image_file_names_in_folder(folder:str):
    """
    Возвращает список всех имён (путей) файлов изображений, находящихся в папке folder, а тажке во всех вложенных на всех уровнях.
    Рекурсивная функция
    :param folder: имя папки верхнего уровня, для которой нужно найти все вложенные файлы изображений, включая и во вложенных файлах
    :return: Возвращает список путей к файлам изображений, которые находятся внутри папки folder на всех вложенных уровнях.
    """
    tuple_extentions = tuple(IMAGE_EXTENSIONS)
    return get_all_files_with_specific_extentions(folder, tuple_extentions)
    

def get_all_files_with_specific_extentions(folder:str, extentions: tuple):
    """
    Возвращает список всех имён (путей) файлов изображений, находящихся в папке folder, а тажке во всех вложенных на всех уровнях.
    Рекурсивная функция
    :param folder: имя папки верхнего уровня, для которой нужно найти все вложенные файлы изображений, включая и во вложенных файлах
    :extentions: список расширений файлов, которые нужно найти, например [".jpg", ".jpeg"]
    :return: Возвращает список путей к файлам изображений, которые находятся внутри папки folder на всех вложенных уровнях.
    """
    result_list = list()
    with os.scandir(folder) as it:
        for entry in it:
            if entry.is_dir():
                result_list.extend(get_all_files_with_specific_extentions(entry.path, extentions))
            elif str.lower(entry.name).endswith(extentions) and entry.is_file():
                result_list.append(entry.path)

    return result_list

def get_file_name_only(file_path: str):
    return file_path.split(os.sep)[-1][::-1].split('.', 1)[1][::-1]

def get_file_guid(file_path: str):
    return file_path.split(os.sep)[-1][::-1].split('.', 1)[1][::-1][:36]




In [15]:
""" Скрипт для поиска похожих изображений (не байтовых дубликатов)
    Основная функция принимает путь папке с базой изображений, путь к папке для сохранения результатов, минимальную и максимальную дистанцию, флаг для теста.
    Если нашлись похожие изображения, то после работы скрипта создается папка с json. В json изображения отсортированы по количеству похожих в порядке убывания.
    В каждом словаре похожих изображений заглавным является изображение с самым большим весом в KB.
    При активации теста создаются папки, чье имя == имени изображения с самым большим весом в KB. В этой папке хранится лучшее изображение и все похожие на него.
    Так же при активации теста создается папка, в которой сохраняются папки, чье имя == имени изображения с самым большим весом в KB. Но в них хранятся склеенные
    изображения с логикой: лучшее изображение склеено с каждым похожим. Именем изображения является дистанция между векторами каждой пары.
    Для поиска подходящий дистанций реализована возможность вызова основной функции ступенчато. После каждой стапени создается папка с указанием диапазона дистанции.

    Папка с результатом поиска очищается при каждом новом запуске скрипта.
    
"""
# git clone https://github.com/sebastian-sz/efficientnet-v2-keras.git 

from keras.applications import NASNetLarge
from keras.applications.nasnet import preprocess_input
import numpy as np
from PIL import Image, ImageOps
import pickle as pk
from PIL import Image
from sklearn.metrics import DistanceMetric
import sys
import os
import argparse
import shutil
import json
import time
from PIL import ImageFile

ImageFile.LOAD_TRUNCATED_IMAGES = True



#функция для преобразования изображения в вектор
def build_vector(img, model):
    np_img = np.array(img.resize((331, 331), Image.LANCZOS))
    preprocessed_img = preprocess_input(np.expand_dims(np_img, axis = 0))
    x_conv = model.predict(preprocessed_img)
    img_vector = x_conv[0]
    img_vector /=  np.linalg.norm(img_vector)

    return img_vector


def glue_image(path_result, model, dist):
    all_folder_similar = os.listdir(path_result)
    all_folder_similar.remove('result_duplicate-similar.json')

    for folder_similar in all_folder_similar:
        os.makedirs(os.path.join(path_result, "!result_glue", folder_similar), exist_ok=True)

        all_img_in_folder = os.listdir(os.path.join(path_result, folder_similar))
        all_img_guid_in_folder = [img_name.split('.')[0] for img_name in all_img_in_folder]
        best_img = all_img_in_folder[all_img_guid_in_folder.index(folder_similar)]
        list_similar_img = [img for img in all_img_in_folder if img.split('.')[0] != best_img.split('.')[0]]

        best_image = Image.open(os.path.join(path_result, folder_similar, best_img))
                
        if best_image.mode != 'RGB':
            best_image = best_image.convert('RGB')
            best_image = ImageOps.exif_transpose(best_image)

        best_image_vector = build_vector(best_image, model)

        for similar_img in list_similar_img:
            two_img_vector = [best_image_vector]

            similar_image = Image.open(os.path.join(path_result, folder_similar, similar_img))
            similar_image = ImageOps.exif_transpose(similar_image)
            
            if similar_image.mode != 'RGB':
                similar_image = similar_image.convert('RGB')

            similar_image_vector = build_vector(similar_image, model)
            two_img_vector.append(similar_image_vector)

            two_img_vector = np.array(two_img_vector)
            two_img_vector = np.squeeze(two_img_vector)

            name_dist = str(dist.pairwise(two_img_vector)[0][1]).replace('.', ',')

            best_image_size = best_image.size
            new_image = Image.new('RGB',(2 * best_image_size[0], best_image_size[1]), (250,250,250))
            
            new_image.paste(best_image, (0, 0))
            new_image.paste(similar_image, (best_image_size[0], 0))

            new_image.save((os.path.join(path_result, "!result_glue", folder_similar, f"{name_dist}.jpg")), "JPEG")


def save_similar(path_result, list_path_all_img, result_dict_all_sort, all_img_guid):
    print("Тест активирован. Сохраняем изображения для визуального контроля")

    for path in list_path_all_img:
        name_img = os.path.basename(path).split('.')[0]
        if name_img in result_dict_all_sort.keys():
            path_best_img = os.path.join(path_result, os.path.basename(path).split('.')[0])
            os.makedirs(path_best_img, exist_ok=True)
            pref = list_path_all_img[all_img_guid.index(os.path.basename(path).split('.')[0])].split('/')[-2]
            path_for_save = os.path.join(path_best_img, pref)
            if os.path.exists(path_for_save) == False:
                os.makedirs(path_for_save, exist_ok=True)
            shutil.copy2(path, path_for_save)
            
            all_similar = []
            all_pref = []

            for similar in result_dict_all_sort[name_img]:
                all_similar.append(all_img_guid.index(similar))
                all_pref.append(list_path_all_img[all_img_guid.index(similar)].split('/')[-2])
            
            for i in range(len(all_similar)):
                path_for_save = os.path.join(path_best_img, all_pref[i])
                if os.path.exists(path_for_save) == False:
                    os.makedirs(path_for_save, exist_ok=True)
                shutil.copy2(list_path_all_img[all_similar[i]], path_for_save)


def find_similar(basepath, path_result, dist_min, dist_max, test):
            
    model = NASNetLarge(weights='imagenet', input_shape=(331, 331, 3), include_top = False, pooling = 'max')

    if os.path.exists(path_result):
        shutil.rmtree(path_result, ignore_errors=True)
    os.makedirs(path_result, exist_ok=True)

    list_path_all_img = get_all_image_file_names_in_folder(basepath)
    all_img_guid = [os.path.basename(img_path).split('.')[0] for img_path in list_path_all_img]
    all_img_unique_guid = set(all_img_guid)

    start_box = time.time()

    try:
        all_img_vector_and_guid = pk.load(open("cache_data\\box_vector_NASNetLarge.pkl", "rb"))
    except (OSError, IOError) as exc:
        all_img_vector_and_guid = []
    
    list_for_check = [img['img_guid'].split('.')[0] for img in all_img_vector_and_guid]           #список guid всех изображений в базе
    list_for_delet = []                                                     #список кондидатов на удаление из бокса

    for idx in range(len(all_img_vector_and_guid)):                                    #ищем изображения, которые есть в боксе, но нету в базе
        if all_img_vector_and_guid[idx]['img_guid'].split('.')[0] not in all_img_unique_guid:
            list_for_delet.append(idx)

    print(f"Удалено {len(list_for_delet)} изображений")

    for idx in reversed(list_for_delet):                                      #удаляем изображения из бокса, которые есть в боксе, но нету в базе
        del all_img_vector_and_guid[idx]

    new_img = 0

    for path_img in list_path_all_img:                                      #проходим по всем изображениям в базе
        img_guid = os.path.basename(path_img).split('.')[0]
        if img_guid in list_for_check:                                      #добавляем все guid и их вектора изображений из базы в бокс, если их нету в боксе
            continue
        img = Image.open(path_img)
        img = ImageOps.exif_transpose(img)
        
        if img.mode != 'RGB':
            img = img.convert('RGB')

        img_vector = build_vector(img, model)
        new_img += 1

        all_img_vector_and_guid.append({'img_guid': img_guid, 'vector': img_vector})

    print(f"Добавлено {new_img} изображений")

    pk.dump(all_img_vector_and_guid, open("cache_data\\box_vector_NASNetLarge.pkl", "wb"))                   #сохраняем обновленный бокс

    all_only_vector = []

    for img in all_img_vector_and_guid:                                              #от бокса оставляем только сами вектора для поиска расстояния
        all_only_vector.append(np.array(img['vector']))
    all_only_vector = np.array(all_only_vector)
    all_only_vector = np.squeeze(all_only_vector)

    print(f"На создание/ обновление бокса потребовалось {start_box - time.time()}")
    print("Бокс обновлен, начинаем поиск похожих изображений")

    start_find = time.time()

    dist = DistanceMetric.get_metric('euclidean')

    result_temp = dict()                                                         

    for i, distances in enumerate(dist.pairwise(all_only_vector)):

        for eu_dist in distances:
            if dist_min < eu_dist <= dist_max and list_path_all_img[i] != list_path_all_img[list(distances).index(eu_dist)]:   #не добавляем самого себя
                path_origin = list_path_all_img[i]
                name_origin = os.path.basename(path_origin).split('.')[0]

                name_duplicate = os.path.basename(list_path_all_img[list(distances).index(eu_dist)]).split('.')[0]
                
                if name_origin not in result_temp.keys(): # and name_origin not in [a for tup in result_temp.values() for a in tup]: #and name_duplicate_size not in result_temp.values()
                    result_temp.setdefault(name_origin, [name_duplicate])
                
                elif name_duplicate not in result_temp[name_origin]:
                    result_temp[name_origin].append(name_duplicate)
                    
    list_all_similar = []

    for origin, similars in result_temp.items():
        pack_similar = [origin]
        
        for similar in similars:
            pack_similar.append(similar)

        sort_size_similar = sorted(pack_similar, key = lambda name: name, reverse = True)
        similar_only_guid = [name.split('.')[0] for name in sort_size_similar]
        list_all_similar.append(similar_only_guid)
    
    sort_len_similar_list_similar = sorted(list_all_similar, key = lambda len_similar: len(len_similar), reverse=True)

    result_dict_all_sort = dict()

    for similar in sort_len_similar_list_similar:
        result_dict_all_sort.setdefault(similar[0], similar[1:])

    print(f"На поиск похожих изображений потребовалось {start_find - time.time()}")

    with open(os.path.join(path_result, 'result_duplicate-similar.json'), 'w') as file:
        json.dump(result_dict_all_sort, file)

    if test == True:
        start_save_similar = time.time()
        save_similar(path_result, list_path_all_img, result_dict_all_sort, all_img_guid)
        print(f"На сохранение похожих изображений в соответствующие лучшему изображению папки потребовалось {start_save_similar - time.time()}")

        start_glue = time.time()
        #glue_image(path_result, model, dist)
        print(f"На сохранение похожих изображений в соответствующие лучшему изображению папки и их склейку потребовалось {start_glue - time.time()}")


if __name__ == "__main__":
    default_basepath = "/home/dhauryk/TEMP/Friday"
    #default_basepath = "D:\\All_full_images"
    default_path_result = "/home/dhauryk/TEMP/temp"
    default_dist_min = 0
    default_dist_max = 0.3
    default_test = True
    default_flag_some_dist = False

    parser = argparse.ArgumentParser(description='Ищем дубликаты-похожие фото')
    parser.add_argument('--basepath', default = default_basepath,
                        help='Путь к папке с изображениями')
    parser.add_argument('--path_result', default = default_path_result,
                        help='Путь к папке для сохранения результата')
    parser.add_argument('--dist_min', default = default_dist_min,
                        help='Минимальная дистанция для отсечки between(0, dist_max)')
    parser.add_argument('--dist_max', default = default_dist_max,
                        help='Максимальная дистанция для отсечки between(dist_min, 7)')
    parser.add_argument('--test', default = default_test,
                        help='Сохранять ли похожие изображения для зрительного контроля и создавать ли склейки?')
    args, unknown = parser.parse_known_args()

    basepath = args.basepath
    path_result = args.path_result
    dist_min = args.dist_min
    dist_max = args.dist_max
    test = args.test

    if default_flag_some_dist == True:
        for dist in range(0, 50):
            find_similar(basepath, f"{path_result}  dist {dist/10} - {(dist + 1)/10}", dist/10, (dist + 1)/10, test)
    else:
        find_similar(basepath, path_result, dist_min, dist_max, test)

print("Завершено")

Удалено 0 изображений
Добавлено 0 изображений
На создание/ обновление бокса потребовалось -0.10820364952087402
Бокс обновлен, начинаем поиск похожих изображений
На поиск похожих изображений потребовалось -3.2875211238861084
Тест активирован. Сохраняем изображения для визуального контроля
На сохранение похожих изображений в соответствующие лучшему изображению папки потребовалось -0.043355703353881836
На сохранение похожих изображений в соответствующие лучшему изображению папки и их склейку потребовалось -4.76837158203125e-07
Завершено
