# LoRA Dataset Automaker

Created by *Maximax67*

*Civitai: [Maximax67](https://civitai.com/user/Maximax67)*  
*Telegram: [@Maximax67](https://t.me/Maximax67)*  
*Github: [Maximax67](https://github.com/Maximax67)*  
*Gmail: maximax6767@gmail.com*  

I use some code from this [Dataset Maker colab](https://colab.research.google.com/github/hollowstrawberry/kohya-colab/blob/main/Dataset_Maker.ipynb) colab for deleting duplicate images and auto-tagging them (it uses [kohya-ss scripts](https://github.com/kohya-ss/sd-scripts)). Also, I use [yolov5_anime](https://github.com/zymk9/yolov5_anime.git) models by zymk9 for detecting faces on images. I trained a model that calculates the similarity between anime faces. The dataset of images is not very large, and the model may give incorrect predictions for some characters.

You can run this notebook locally. Tested with laptop Nvidia RTX 3050 4 GB, 55W GPU. But you can still run it locally without GPU.

<hr>

üü• - Important cell! You need to run it!<br>
üü® - It is not necessary to run this cell if you are working on a past saved project (locally or on Google drive if colab).<br>
üü© - Optional cell.

![image.svg](./image.svg)

## **1Ô∏è‚É£üü• Define global project settings, install and include all libraries, define paths**

In [None]:
# @title 1 Global project setting

# @markdown Select if you are working in colab!

# IMPORTANT! IF YOU USE THIS NOTEBOOK LOCALLY, PASTE "Local" IN RUNTIME_TYPE!
runtime_type = "Colab" # @param ["Colab", "Colab no gpu (only scrapping)", "Local"]

# If you are working locally and don't want to type all strings in variables set this variable to True.
# The most important fields will be shown as inputs!
ask_using_inputs = False

if runtime_type == "Colab":
    colab = True
    colab_no_gpu = False
elif runtime_type == "Colab no gpu (only scrapping)":
    colab = True
    colab_no_gpu = True
elif runtime_type == "Local":
    colab = False
    colab_no_gpu = False
else:
    print("ERROR! Wrong runtime type. Using default local runtime!")
    colab = False
    colab_no_gpu = False

if not colab and colab_no_gpu:
    colab_no_gpu = False
    print("If you are not using colab (colab = False), you don't need to select colab_no_gpu!")


# @markdown Select "Colab only scrapping" if you are working in colab and want only to scrape images (use no GPU runtime)! It will not install and import libraries required for 3-5 sections.

# @markdown You can use yolov_5x model istead of yolov_5s. It works slower.
# @markdown The performances are comparable. However, with a higher confidence threshold, yolov5x can significantly outperform yolov5s.
# @markdown For more details [read here](https://github.com/zymk9/yolov5_anime/tree/master).
use_yolov_5x_model = False #@param {type:"boolean"}

# @markdown If enabled, yolov 5 and similarity models will be downloadede to the working dir (you will define it in 1.5 cell), instead of the current location.
download_models_to_working_dir = False #@param {type:"boolean"}

if colab:
    from google.colab.output import clear as clear_output
    from google.colab import files
else:
    from IPython.display import clear_output

print("Done!")

In [None]:
# @title 2 Install libraries
# @markdown If you are working locally, you may not run this cell if you have already run it once.
!pip install -q requests beautifulsoup4 regex tqdm

if not colab_no_gpu:
    import os
    root_directory = os.getcwd()

    if not os.path.exists(os.path.join(root_directory, "yolov5")):
        !git clone "https://github.com/ultralytics/yolov5"
        os.chdir("yolov5")
        !pip -q install -r requirements.txt
        os.chdir(root_directory)

    if not os.path.exists(os.path.join(root_directory, "kohya-trainer")):
        !git clone "https://github.com/Linaqruf/kohya-trainer"
        os.chdir("kohya-trainer")
        !pip -q install -r requirements.txt
        os.chdir(root_directory)

    !pip install -q opencv-python
    !pip install -q Pillow
    !pip install -q torch torchvision scikit-learn
    !pip install -q fiftyone ftfy

    if colab:
        !pip install -q fiftyone-db-ubuntu2204

clear_output()
print("Installed!")

In [None]:
# @title 3 Import libraries

import requests
from bs4 import BeautifulSoup
from urllib.parse import quote

from copy import deepcopy

import os
import re
import time
import shutil
import zipfile

import concurrent.futures
import threading

from tqdm.notebook import tqdm

if not colab_no_gpu:
    import sys
    import random
    import pickle
    from collections import Counter

    import subprocess

    import numpy as np
    import fiftyone as fo
    import fiftyone.zoo as foz
    from fiftyone import ViewField as VF
    from sklearn.metrics.pairwise import cosine_similarity, pairwise_distances

    import torch
    import torch.nn as nn
    from torch.utils.data import Dataset, DataLoader
    from torchvision import models, transforms

    from PIL import Image

    import matplotlib.pyplot as plt
    %matplotlib inline

    if torch.cuda.is_available():
        device = "cuda"
        print("Using cuda GPU")
    else:
        device = "cpu"
        print("torch.cuda is NOT available! Using CPU!")

print("Imported!")

In [None]:
# @title 4 üü© Connect to google drive (Optional)
# @markdown Only if you're in a colab! If you want to save everything to google drive or want to export the dataset to Google drive.
if colab:
    from google.colab import drive
    print("Connecting to Google Drive...")
    drive.mount('/content/drive')
    print("Connected!")
else:
    print("You are not in a colab! If it's wrong, please select colab runtime type at the first cell!")

In [None]:
# @title 5 Define path variables

# @markdown Enter your project name. It will create a folder with this name inside your working dir.
# @markdown You can change the working dir to <b>drive/My Drive/</b> if you want to save all data in your google drive.
# @markdown If you run this notebook locally, change working_dir to where you want to save all data.
project_name = "" #@param {type:"string"}
working_dir = ""  #@param {type:"string"}

supported_image_formats = ('.jpg', '.png', '.jpeg')

if ask_using_inputs:
    project_name = input("Enter project name:")
    working_dir = input("Input working dir (empty input means './'):")

if not project_name:
    project_name = "default"

if not working_dir:
    working_dir = "./"

def create_valid_folder_name(name):
    folder_name = name.replace(" ", "_")
    folder_name = re.sub(r'[^\w.-]', '', folder_name)

    return folder_name

project_name = create_valid_folder_name(project_name)
working_dir = os.path.abspath(working_dir)

project_dir = os.path.join(working_dir, project_name)
os.makedirs(project_dir, exist_ok=True)

image_scrape_path = os.path.join(project_dir, "scrapped")
filtered_dir = os.path.join(project_dir, "filtered")

faces_dir = os.path.join(project_dir, "faces")

example_folder = os.path.join(project_dir, "examples")

result_dir = os.path.join(project_dir, "result")

model_dir_path = working_dir if download_models_to_working_dir else os.path.abspath(os.getcwd())

if use_yolov_5x_model:
    yolov_model_path = os.path.join(model_dir_path, "yolov5x_anime.pt")
else:
    yolov_model_path = os.path.join(model_dir_path, "yolov5s_anime.pt")

similarity_model_path = os.path.join(model_dir_path, "similarity.pt")

print("Project name:".ljust(14), project_name)
print("Working dir:".ljust(14), working_dir)

print("\n\tProject structure:")
print("Image scrape path".ljust(30), image_scrape_path)
print("Filtered from duplicates dir".ljust(30), filtered_dir)
print("Detected faces dir".ljust(30), faces_dir)
print("Example dir".ljust(30), example_folder)
print("Result dir".ljust(30), result_dir)
print("-" * 70)
print("Face detection model".ljust(30), yolov_model_path)
print("Face similarity model".ljust(30), similarity_model_path)

print("\nSupported image formats:", *supported_image_formats)

# This functions will be used in multiple sections. I decided to put them here.
def delete_contents_of_dir(directory):
    for item in os.listdir(directory):
        item_path = os.path.join(directory, item)
        if os.path.isfile(item_path):
            os.remove(item_path)
        elif os.path.isdir(item_path):
            shutil.rmtree(item_path)

if not colab:
    # When I run python scripts like !python, I don't see their outputs locally in jupyter notebook.
    # So, I came up with this solution.
    def script_runner(script):
        process = subprocess.Popen(
            script,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            universal_newlines=True
        )

        while True:
            output = process.stdout.readline()
            if output == '' and process.poll() is not None:
                break
            if output:
                print(output.strip())
                sys.stdout.flush()

In [None]:
# @title 6 Set character name

# @markdown It will make a subfolder inside your result folder. You can make as many characters as you want from once downloaded screencaps and detected faces.
# @markdown Just rerun this cell to change character!
character_name = "" # @param {type:"string"}

if ask_using_inputs:
    character_name = input("Enter character name:")

if not character_name:
    character_name = "default"

character_name = create_valid_folder_name(character_name)

example_character = os.path.join(example_folder, character_name)
example_faces_dir = os.path.join(example_folder, character_name, "faces")
example_orig_img_dir = os.path.join(example_folder, character_name, "images")
similar_faces_dir = os.path.join(project_dir, "similar_faces", character_name)
character_results = os.path.join(result_dir, character_name)

print("Done! Character name is:", character_name)

## **2Ô∏è‚É£üü® Download screencaps (takes the most time)**

In [None]:
# @title 1 Search your anime / movie / tv
# @markdown Enter your anime / film / cartoon / tv name. Important! This dataset maker can only detect anime and some cartoon faces. It will not work with real humans.
# @markdown Maybe I make this in the future.
prompt = "Demon Slayer" #@param {type:"string"}

if ask_using_inputs:
    prompt = input("Enter search prompt:")

# @markdown You can configure your search parameters, but I recommend leaving all turned on.
search_anime = True  #@param {type:"boolean"}
search_movies = True #@param {type:"boolean"}
search_tv = True     #@param {type:"boolean"}

def fetch_results(prompt, search_anime=True, search_movies=True, search_tv=True):
    if not search_anime and not search_movies and not search_tv:
        print("Error! Nothing to search! One of three categories should be selected!")
        return None

    url = "https://fancaps.net/search.php?q=" + quote(prompt)

    if search_movies:
        url += "&MoviesCB=Movies"

    if search_tv:
        url += "&TVCB=TV"

    if search_anime:
        url += "&animeCB=Anime"

    response = requests.get(url)

    if response.status_code != 200:
        print("Failed to fetch the website!")
        return None

    soup = BeautifulSoup(response.content, "html.parser")
    results_content = soup.find_all("div", class_="single_post_content")[1].find_all("table")
    return results_content

def parse_website(prompt, search_anime=True, search_movies=True, search_tv=True):
    results_content = fetch_results(prompt, search_anime, search_movies, search_tv)

    if not results_content:
        return None

    categories = []
    if search_movies:
        categories.append("Movies")
    if search_tv:
        categories.append("TV")
    if search_anime:
        categories.append("Anime")

    results = {category: [] for category in categories}

    counter = 1

    for i, content in enumerate(results_content):
        trs = content.find_all('tr')
        for tr in trs:
            a_element = tr.find('h4').find('a')
            link = a_element.get('href')
            name = a_element.text
            results[categories[i]].append((name, link, counter))
            counter += 1

    return results

def print_results(results):
    if not results:
        print("No results to display.")
        return

    for category, items in results.items():
        print(f"{category} ({len(items)}):")
        if not items:
            print("  No items found.")
            continue
        for i, item in enumerate(items, 1):
            print(f"  {item[2]}. {item[0]}")

search_results = parse_website(prompt, search_anime, search_movies, search_tv)
print("\tResults for", prompt)
print_results(search_results)

In [None]:
# @title 2 Choose what you need from the list
# @markdown Write selected item numbers separated by a comma (you can use the '-' range). For example 1,2,5-10
selected_input = "1" #@param {type:"string"}

# @markdown If you choose tv or anime, it may contain episodes/series.
# @markdown If you turn this on, you can select which episodes/series do you want to download for each selected item in the next cell.
ask_for_episodes = True #@param {type:"boolean"}

if ask_using_inputs:
    selected_input = input("Write selected indices separated by comma (you can use '-' range):")

def parse_input_indices(input_string):
    indices = []
    ranges = input_string.split(',')

    for r in ranges:
        r = r.strip()
        if not r:
            continue  # Skip empty parts

        if '-' in r:
            range_parts = r.split('-')
            if len(range_parts) != 2:
                print(f"Invalid range format: {r}")
                return None

            start_str, end_str = map(str.strip, range_parts)
            if not start_str.isdigit() or not end_str.isdigit():
                print(f"Invalid range values: {r}")
                return None

            start, end = int(start_str), int(end_str)
            indices.extend(range(start, end + 1))
        else:
            if not r.isdigit():
                print(f"Invalid index value: {r}")
                return None

            indices.append(int(r))

    return indices

def select_items(results, selected_indices):
    selected = {}

    for category, items in results.items():
        selected_items = []
        for item in items:
            if item[2] in selected_indices:
                selected_items.append(item)
        selected[category] = selected_items

    return selected

def print_selected(selected):
    if not selected:
        print("Nothing selected!!!")
        return

    print("\tSelected:")
    for category, items in selected.items():
        print(f"{category} ({len(items)}):")
        if not items:
            print("  No items selected!")
            continue
        for item in items:
            print(f"  {item[2]}. {item[0]}")

if not selected_input:
    print("ERROR! Nothing selected!")
else:
    selected_list = parse_input_indices(selected_input)

if selected_list:
    selected_results = select_items(search_results, selected_list)
    print_selected(selected_results)

    series = False
    for category, items in selected_results.items():
        if items and (category == "TV" or category == "Anime"):
            series = True
            break

    if series and not ask_for_episodes:
        print("\nWARNING! Some of your selected items may contain series/episodes!")
        if ask_using_inputs:
            print("If you don't want to download them fully, enter 'Y' and I will ask you what series/episodes to download in the next cell! Other inputs will mean 'no'!")
            ask_input = input("")
            if ask_input == 'Y' or ask_input == 'y':
                ask_for_episodes = True
        else:
            print("If you don't want to download them fully, mark the ask_for_episodes box and rerun this cell.")
            print("I will ask you what series/episodes to download in the next cell!")

else:
    print("Run this cell again!")

In [None]:
# @title 3 Download screencaps

# @markdown Average download time is 5,5 images per second! The average anime episode is 800 images. If your anime has 20 episodes, the download time will be 48 minutes!
# @markdown If your anime is big, sometimes it's better to make a working dir in google drive and select no GPU runtime for scrapping, switch to GPU runtime, and go to the next cells.
# @markdown Or you can scrape images with no GPU runtime, download your project as zip file (see 6.2 cell), switch to GPU runtime, upload your zip file to colab, and unzip it (see 7.1 cell).

# @markdown It will make dirs in your scrapped dir for each downloaded episode. Doesn't matter if you don't want to use these screencaps somewhere else.
make_dirs_for_episodes = True #@param {type:"boolean"}

# @markdown If you want to delete previously downloaded images in the folder (if you run this cell before), select this:
delete_previous_downloaded_images = False # @param {type:"boolean"}

os.makedirs(image_scrape_path, exist_ok=True)

if delete_previous_downloaded_images:
    delete_contents_of_dir(image_scrape_path)

# @markdown Download params:
timeout = 5 #@param {type:"number"}
max_retries = 3 #@param {type:"integer"}


def get_string_after_last_slash(string):
    last_index = string.rfind('/')
    if last_index != -1:
        return string[last_index + 1:]

    return string


def fetch_episode_images_names(url):
    images_names = []

    continue_search = True
    page = 1
    while continue_search:
        fetch_url = url + "&page=" + str(page)
        response = requests.get(fetch_url)

        if response.status_code != 200:
            print("Failed to fetch the website!")
            break

        soup = BeautifulSoup(response.content, "html.parser")
        post_area = soup.find('section', {'id': 'contentbody'}).find('div', {'class': 'single_post_area'})

        images = []
        episodes_section = post_area.find(lambda tag: tag.name == 'div' and ('Episode Screencaps' in tag.text or 'Episode Images' in tag.text))
        next_sibling = episodes_section.find_next_sibling('div', {'class': 'row'})
        while next_sibling:
            found_images = next_sibling.find_all('img', {'class': 'imageFade'})
            for found_image in found_images:
                images.append(found_image.get('src'))
            next_sibling = next_sibling.find_next_sibling('div', {'class': 'row'})

        page += 1

        if images:
            for src in images:
                name = get_string_after_last_slash(src)
                images_names.append(name)

            next_page_url = post_area.select_one('ul.pagination li:last-child a').get('href')
            if next_page_url == '#':
                continue_search = False
        else:
            continue_search = False


    return images_names


def fetch_movies_images_names(url):
    images_names = []

    continue_search = True
    page = 1
    while continue_search:
        fetch_url = url + "&page=" + str(page)
        response = requests.get(fetch_url)

        if response.status_code != 200:
            print("Failed to fetch the website!")
            break

        soup = BeautifulSoup(response.content, "html.parser")
        images_bar = soup.find('section', {'id': 'contentbody'}).find('div', {'class': 'middle_bar'})

        images = []
        title = images_bar.find('h2', {'class': 'post_title'})
        images_div = title.find_next_sibling('div', {'class': ''})
        if images_div:
            found_images = images_div.find_all('img', {'class': 'imageFade'})
            for found_image in found_images:
                images.append(found_image.get('src'))

        page += 1

        if images:
            for src in images:
                name = get_string_after_last_slash(src)
                images_names.append(name)

            next_page_url = images_bar.select_one('ul.pagination li:last-child a').get('href')
            if next_page_url == '#':
                continue_search = False

        else:
            continue_search = False


    return images_names


def fetch_episodes_links(url):
    episodes_links = []

    continue_search = True
    page = 1
    while continue_search:
        fetch_url = url + "&page=" + str(page)
        response = requests.get(fetch_url)

        if response.status_code != 200:
            print("Failed to fetch the website!")
            break

        soup = BeautifulSoup(response.content, "html.parser")
        contentbody_section = soup.find('section', {'id': 'contentbody'})
        target_links = contentbody_section.find_all('a', {'class': 'btn btn-block'})
        page += 1

        if target_links:
            for a in target_links:
                link = a.get('href')
                if link.startswith('/'):
                    link = "https://fancaps.net" + link

                episodes_links.append(link)
        else:
            continue_search = False

    return episodes_links


def download_single_image(url, path, name, timeout=10, max_retries=3):
    retries = 0
    while retries < max_retries:
        try:
            response = requests.get(url, stream=True, timeout=timeout)
            if response.status_code == 200:
                file_path = os.path.join(path, name)
                with open(file_path, 'wb') as f:
                    for chunk in response.iter_content(16384):
                        f.write(chunk)
                break  # Successful download, exit loop
            else:
                print(f"Failed to download image {name} - Status Code: {response.status_code}")
        except requests.exceptions.ReadTimeout:
            retries += 1
            print(f"Read timeout occurred while downloading image {name}. Retrying ({retries}/{max_retries})...")
        except requests.exceptions.RequestException as e:
            print(f"Error downloading image {name}: {e}")
    else:
        # If max_retries reached and the image is partially downloaded, delete the partially downloaded file
        file_path = os.path.join(path, name)
        if os.path.exists(file_path):
            os.remove(file_path)
            print(f"Deleted partially downloaded file: {file_path}")


def download_images_fancaps(names, path, section, timeout=10, max_retries=3):
    os.makedirs(path, exist_ok=True)

    total_images = len(names)
    lock = threading.Lock()

    def update_progress():
        with lock:
            pbar.update(1)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for name in names:
            try:
                url = "https://cdni.fancaps.net/file/fancaps-" + section + "images/" + name
                future = executor.submit(download_single_image, url, path, name, timeout, max_retries)
                future.add_done_callback(lambda _: update_progress())
                futures.append(future)
            except requests.exceptions.RequestException as e:
                print(f"Error! {e}")

        with tqdm(total=total_images) as pbar:
            for future in concurrent.futures.as_completed(futures):
                future.result()

def ask_for_episodes_to_download(n_episodes):
    print("\nWhat series/episodes to download (separated by comma, you can use  the '-' range, empty input means to download all):")
    episodes_to_download_list = []

    while not episodes_to_download_list:
        episodes_to_download_input = input()

        if not episodes_to_download_input:
            episodes_to_download_list = range(1, n_episodes + 1)
            print("Downloading all!")
        else:
            episodes_to_download_list = parse_input_indices(episodes_to_download_input)
            if not episodes_to_download_list:
                print("Input again!")

    return episodes_to_download_list

def fancaps_scrapper(url, path, ask_for_episodes=False, timeout=10, max_retries=3, dirs_for_episodes=True):
    if "/movies/" in url:
        print("Fetching images names...")
        names = fetch_movies_images_names(url)
        print("Total %d images. Downloading..." % len(names))
        download_images_fancaps(names, path, "movie")
        print()
    else:
        if "/anime/" in url:
            section = "anime"
        elif "/tv/" in url:
            section = "tv"
        else:
            print("Error! Invalid url!")
            return

        print("Fetching episodes links")
        links = fetch_episodes_links(url)
        print("Founded %d episodes" % len(links))

        episodes_to_download = []

        if ask_for_episodes:
            episodes_to_download = ask_for_episodes_to_download(len(links))
        else:
            episodes_to_download = range(1, len(links) + 1)

        for i, link in enumerate(links, 1):
            if i in episodes_to_download:
                print("\nEpisode %d" % i, end='')
                names = fetch_episode_images_names(link)
                start = time.time()
                print(": %d images" % len(names))

                if dirs_for_episodes:
                    path_for_episode = os.path.join(path, f"Episode{i}")
                    download_images_fancaps(names, path_for_episode, section, timeout, max_retries)
                else:
                    download_images_fancaps(names, path, section, timeout, max_retries)

                print("\nDownloaded. Time: %.2fs" % (time.time() - start))


def scrape_selected(selected, save_path, ask_for_episodes=False, timeout=10, max_retries=3, dirs_for_episodes=True):
    if not selected:
        print("ERROR! Nothing selected!!!")
        return

    print("\tStarting scrapping!")
    start_time = time.time()

    for category, items in selected.items():
        if items:
            for item in items:
                folder_path = os.path.join(save_path, create_valid_folder_name(item[0]))
                url = "https://fancaps.net" + item[1]

                os.makedirs(folder_path, exist_ok=True)

                print("\nScraping images for %s:" % item[0])
                scrap_time = time.time()
                fancaps_scrapper(url, folder_path, ask_for_episodes, timeout, max_retries, dirs_for_episodes)
                print("Done %s: %.2fs" % (item[0], time.time() - scrap_time))

    print('-'*30)
    print("\tDone: %.2fs" % (time.time() - start_time))

scrape_selected(selected_results, image_scrape_path, ask_for_episodes, timeout, max_retries, make_dirs_for_episodes)

## **3Ô∏è‚É£üü® Remove duplicates**
Used code from [Dataset Maker colab](https://colab.research.google.com/github/hollowstrawberry/kohya-colab/blob/main/Dataset_Maker.ipynb) with some modifications.

In [None]:
#@title 1 Find duplicates
#@markdown This is how similar images should be for marking them to delete. I recommend 0.96 to 0.99 based on your needs:
similarity_threshold = 0.98 # @param {type:"number"}

#@markdown Batch sizes, if you don't know what it is, better don't touch:
embedding_batch_size = 200 # @param {type:"integer"}
similarity_matrix_batch_size = 1000 # @param {type:"integer"}

#@markdown Clip model name. You can choose another model from fiftyone zoo if you want. Just print its name here.
model_name = "clip-vit-base32-torch" # @param {type:"string"}

dataset = fo.Dataset.from_dir(image_scrape_path, dataset_type=fo.types.ImageDirectory)

# @markdown This cell will load the scrapped dataset, make embeddings using the selected model, calculate the similarity matrix and find samples to remove.

def make_embeddings(model_name, batch_size):
    model = foz.load_zoo_model(model_name)
    embeddings = dataset.compute_embeddings(model, batch_size=batch_size)

    # Unload the model from the GPU to free up memory
    del model
    torch.cuda.empty_cache()

    return embeddings

def calculate_similarity_matrix(embeddings, batch_size):
    batch_size = min(embeddings.shape[0], batch_size)
    batch_embeddings = np.array_split(embeddings, batch_size)
    similarity_matrices = []

    # Find the maximum size of the arrays
    max_size_x = max(array.shape[0] for array in batch_embeddings)
    max_size_y = max(array.shape[1] for array in batch_embeddings)

    for batch_embedding in batch_embeddings:
        similarity = cosine_similarity(batch_embedding)
        # Pad 0 for np.concatenate
        padded_array = np.zeros((max_size_x, max_size_y))
        padded_array[0:similarity.shape[0], 0:similarity.shape[1]] = similarity
        similarity_matrices.append(padded_array)

    # Concatenate the padded arrays
    similarity_matrix = np.concatenate(similarity_matrices, axis=0)
    similarity_matrix = similarity_matrix[0:embeddings.shape[0], 0:embeddings.shape[0]]

    similarity_matrix = cosine_similarity(embeddings)
    similarity_matrix -= np.identity(len(similarity_matrix))

    return similarity_matrix

def make_samples(dataset, similarity_matrix, threshold=0.98):
    dataset.match(VF("max_similarity") > threshold)
    dataset.tags = ["delete", "has_duplicates"]
    id_map = [s.id for s in dataset.select_fields(["id"])]
    samples_to_remove = set()
    samples_to_keep = set()
    for idx, sample in enumerate(dataset):
      if sample.id not in samples_to_remove:
        # Keep the first instance of two duplicates
        samples_to_keep.add(sample.id)

        dup_idxs = np.where(similarity_matrix[idx] > threshold)[0]
        for dup in dup_idxs:
            # We kept the first instance so remove all other duplicates
            samples_to_remove.add(id_map[dup])
        if len(dup_idxs) > 0:
            sample.tags.append("has_duplicates")
            sample.save()
      else:
        sample.tags.append("delete")
        sample.save()

    return samples_to_remove, samples_to_keep

embeddings = make_embeddings(model_name, embedding_batch_size)

clear_output()
print("Embeddings calculated!")

similarity_matrix = calculate_similarity_matrix(embeddings, similarity_matrix_batch_size)
print("Similarity matrix calculated!")

samples_to_remove, samples_to_keep = make_samples(dataset, similarity_matrix, similarity_threshold)
print(f"Remove percentage: {len(samples_to_remove) / (len(samples_to_remove) + len(samples_to_keep)) * 100}")

del embeddings, similarity_matrix, samples_to_remove, samples_to_keep
torch.cuda.empty_cache()

session = None
print("Done!")

In [None]:
# @title 2 üü© Run the fiftyone app to view marked images
# @markdown You can skip this part! It's not important!

if session is None:
    sidebar_groups = fo.DatasetAppConfig.default_sidebar_groups(dataset)
    for group in sidebar_groups[1:]:
        group.expanded = False
    dataset.app_config.sidebar_groups = sidebar_groups

    dataset.save()
    session = fo.launch_app(dataset)
else:
    session.show()

# @markdown You can run the fiftyone app to view all scrapped images and manually mark images that you want to delete.
# @markdown Input any text in the input to save changes and quit!
# @markdown If you don't see the app for more than 2 minutes, maybe your cookies are disabled, or maybe your browser is blocking the session. Try to disable protection.
input("Input something to quit: ")

clear_output()

In [None]:
# @title 3 Delete duplicates

# @markdown Delete all images marked as "delete".

# @markdown If you want to delete previous images in the folder (if you run this cell before), select this:
delete_previous_filtered_images = True # @param {type:"boolean"}

os.makedirs(filtered_dir, exist_ok=True)

if delete_previous_filtered_images:
    delete_contents_of_dir(filtered_dir)

kys = [s for s in dataset if "delete" in s.tags]
dataset.delete_samples(kys)
n_filtered = len(dataset)
dataset.export(export_dir=filtered_dir, dataset_type=fo.types.ImageDirectory)

if session is not None:
    session.refresh()
    fo.close_app()

clear_output()

print("Done! Dataset filtered from %d duplicates! Total %d images left!" % (len(kys), n_filtered))

## **4Ô∏è‚É£üü® Find similar faces**

In [None]:
# @title 1 Download face detection and face similarity models

# @markdown This cell will download the selected face detection yolov 5 model and my face similarity efficientnet-b0 model from google drive.

# @markdown Direct links to the folder with models:
# @markdown https://drive.google.com/drive/folders/1Dn4-GgnLOl-co-eICOrOoH6pp9MexBYu?usp=sharing

# @markdown Thanks to turdus-merula for download functions, [stack-overflow link](https://stackoverflow.com/questions/38511444/python-download-files-from-google-drive-using-url)
def download_file_from_google_drive(id, destination):
    URL = "https://docs.google.com/uc?export=download&confirm=1"

    session = requests.Session()

    response = session.get(URL, params={"id": id}, stream=True)
    token = get_confirm_token(response)

    if token:
        params = {"id": id, "confirm": token}
        response = session.get(URL, params=params, stream=True)

    save_response_content(response, destination)


def get_confirm_token(response):
    for key, value in response.cookies.items():
        if key.startswith("download_warning"):
            return value

    return None


def save_response_content(response, destination):
    CHUNK_SIZE = 32768

    with open(destination, "wb") as f:
        for chunk in response.iter_content(CHUNK_SIZE):
            if chunk:  # filter out keep-alive new chunks
                f.write(chunk)

file_id1 = "1BmuYn_3pUsWOPqRpjzBUiIBk-XLjsKmY"
file_id2 = "1gXckAUQpPSojhNehZHmJdh3juuJPmtwI"
file_id3 = "1j4gnfa3ggsDkp9vfCxh-WiHOFefbOSpK"

if use_yolov_5x_model:
    if not os.path.exists(yolov_model_path):
        print("Donwloading yolov 5x model...")
        download_file_from_google_drive(file_id1, yolov_model_path)
elif not os.path.exists(yolov_model_path):
    print("Donwloading yolov 5s model...")
    download_file_from_google_drive(file_id2, yolov_model_path)

if not os.path.exists(similarity_model_path):
    print("Donwloading similarity model...")
    download_file_from_google_drive(file_id3, similarity_model_path)

print("Done!")

In [None]:
# @title 2 Define detection params and necessary functions

# @markdown Resized image size:
image_size = 640 # @param {type:"number"}

# @markdown Do not save faces images smaller than min_face_size pixels.
min_face_size = 35 # @param {type:"number"}
max_aspect_ratio = 6 # @param {type:"number"}

# @markdown Make predicted box smaller / bigger (0.2 -> 20% larger box)
adjust_crop_box = 0.2 # @param {type:"number"}

# @markdown Confidence that it is a face:
threshold = 0.5 # @param {type:"number"}
iou_threshold = 0.5 # @param {type:"number"}

def visualize_images(dataset, n=10, n_row=5):
    if n == 0:
        print("Error! No images to visualize!")
        return

    if n < 0:
        print("Error! N < 0!")
        return

    if n_row <= 0:
        print("Error! N_ROW <= 0!")
        return

    if n != len(dataset):
        if n > len(dataset):
            n = len(dataset)
            print("Number of random images is greater than dataset length. Visualizing all %d images" % len(dataset))
        elif n < len(dataset):
            print("Visualized random %d images" % n)

        random_indices = random.sample(range(len(dataset)), n)
    else:
        random_indices = range(len(dataset))


    drawed = 0
    while n:
        count = n_row if n > n_row else n
        n -= count

        plt.figure(figsize=(count * 2, 6))

        display_indices = random_indices[drawed:drawed + count]
        drawed += count

        for i, index in enumerate(display_indices):
            image_path = dataset[index]

            img = Image.open(image_path)

            plt.subplot(1, count, i + 1)
            plt.imshow(img)
            plt.axis('off')

        plt.subplots_adjust(wspace=.03, hspace=0)


def xywh_to_xyxy(xywh):
    x_center = xywh[0]
    y_center = xywh[1]
    width = xywh[2]
    height = xywh[3]

    x1 = max(0, x_center - width / 2)
    y1 = max(0, y_center - height / 2)
    x2 = min(1, x_center + width / 2)
    y2 = min(1, y_center + height / 2)

    return x1, y1, x2, y2


def save_cropped(label_dir, image_dir, output_dir, min_face_size=0, max_aspect_ratio=float('inf'), adjust_crop=0):
    counter = 0
    for label_filename in os.listdir(label_dir):
        if label_filename.endswith(".txt"):
            label_path = os.path.join(label_dir, label_filename)
            image_filename_without_ext = os.path.splitext(label_filename)[0]

            image_path = None
            for ext in supported_image_formats:
                potential_image_path = os.path.join(image_dir, f"{image_filename_without_ext}{ext}")
                if os.path.exists(potential_image_path):
                    image_path = potential_image_path
                    break

            if image_path is None:
                print(f"Image not found for {label_filename}")
                continue

            with open(label_path, 'r') as label_file:
                lines = label_file.readlines()

            for i, line in enumerate(lines, 1):
                parts = line.strip().split()
                x, y, w, h, conf = map(float, parts[1:])  # Extract coordinates and confidence

                x1, y1, x2, y2 = xywh_to_xyxy([x, y, w, h])

                box_width, box_height = x2 - x1, y2 - y1
                x1 -= box_width * adjust_crop
                y1 -= box_height * adjust_crop
                x2 += box_width * adjust_crop
                y2 += box_height * adjust_crop

                # Ensure adjusted coordinates stay within bounds
                x1 = max(0, x1)
                y1 = max(0, y1)
                x2 = min(1, x2)
                y2 = min(1, y2)

                aspect_ratio = (x2-x1) / (y2-y1)
                if aspect_ratio < 1:
                    aspect_ratio = 1 / aspect_ratio

                if aspect_ratio > max_aspect_ratio:
                    continue

                # Crop the image using PIL
                image = Image.open(image_path)

                width, height = image.size
                left = int(x1 * width)
                upper = int(y1 * height)
                right = int(x2 * width)
                lower = int(y2 * height)

                if min(right - left, lower - upper) < min_face_size:
                    continue

                cropped_image = image.crop((left, upper, right, lower))

                if cropped_image.mode in ("RGBA", "P"):
                    cropped_image = cropped_image.convert("RGB")

                # Save cropped image to the output folder
                output_filename = f"{image_filename_without_ext}-{i}-{conf}.jpg"
                output_path = os.path.join(output_dir, output_filename)
                cropped_image.save(output_path)

                counter += 1

    return counter


def plot_images_in_folder(folder_path, m=5):
    image_paths = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith(supported_image_formats)]
    visualize_images(image_paths, len(image_paths), m)


print("Done!")

In [None]:
# @title 3 Find faces on filtered images and save them

# @markdown Run yolov5 detect script for finding faces on images. It will create a labels folder with .txt files which contains box coordinates (xywh) and confidence.

# @markdown If you want to delete previous images in the folder (if you run this cell before), select this:
delete_previous_detections = True # @param {type:"boolean"}

os.makedirs(faces_dir, exist_ok=True)

if delete_previous_detections:
    delete_contents_of_dir(faces_dir)

# @markdown I used yolov5 models from [yolov5_anime](https://github.com/zymk9/yolov5_anime.git) repo by zymk9 for face detection.

if colab:
    !python yolov5/detect.py --weights "{yolov_model_path}" --source "{filtered_dir}" \
            --imgsz {image_size} --nosave --project "{project_dir}" --name "{faces_dir}" \
            --conf-thres {threshold} --iou-thres {iou_threshold}  --exist-ok --save-txt --save-conf
else:
    script_runner(f'python yolov5/detect.py --weights "{yolov_model_path}" --source "{filtered_dir}" '
                  f'--imgsz {image_size} --nosave --project "{project_dir}" --name "{faces_dir}" '
                  f'--conf-thres {threshold} --iou-thres {iou_threshold}  --exist-ok --save-txt --save-conf')

clear_output()
print("Detection completed!")

print("Saving cropped faces...")
n_cropped = save_cropped(os.path.join(faces_dir, "labels"), filtered_dir, faces_dir, min_face_size, max_aspect_ratio, adjust_crop_box)
print("Done! Total %d faces!" % n_cropped)

In [None]:
# @title 4 Choose an example image(s)

# @markdown IMPORTANT! Paste links (or paths) to example images (how your desired character looks).
# @markdown You can paste multiple links/paths separated by a comma.

if colab:
    example_images_input = "" # @param {type:"string"}
    example_images = example_images_input.split(',')
elif ask_using_inputs:
    example_images_input = input("Enter example images links or paths separated by comma:")
    example_images = example_images_input.split(',')
else:
    example_images = [
        # If you are not in the colab, it's better to paste links or image paths here.
        # If your path contains '\' char, please add 'r' before "". For example, r"D:\test.jpg"
        "https://static.wikia.nocookie.net/cowboybebop/images/7/73/Screen_Shot_2013-12-11_at_12.52.29_PM.png/revision/latest?cb=20140404054920",
        "https://images-wixmp-ed30a86b8c4ca887773594c2.wixmp.com/f/89be45f9-c25b-44bc-929d-ecf8d9ccf719/d4jh9ru-250de96b-a7da-4801-9e41-0355bf36a327.jpg/v1/fill/w_638,h_773,q_75,strp/spike_spiegel_by_abnormal_child_d4jh9ru-fullview.jpg?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1cm46YXBwOjdlMGQxODg5ODIyNjQzNzNhNWYwZDQxNWVhMGQyNmUwIiwiaXNzIjoidXJuOmFwcDo3ZTBkMTg4OTgyMjY0MzczYTVmMGQ0MTVlYTBkMjZlMCIsIm9iaiI6W1t7ImhlaWdodCI6Ijw9NzczIiwicGF0aCI6IlwvZlwvODliZTQ1ZjktYzI1Yi00NGJjLTkyOWQtZWNmOGQ5Y2NmNzE5XC9kNGpoOXJ1LTI1MGRlOTZiLWE3ZGEtNDgwMS05ZTQxLTAzNTViZjM2YTMyNy5qcGciLCJ3aWR0aCI6Ijw9NjM4In1dXSwiYXVkIjpbInVybjpzZXJ2aWNlOmltYWdlLm9wZXJhdGlvbnMiXX0.Y8VP9f-BDhZK5Zq1Ct6TGvxgcSOvo-T5pVnhVEJrOOA",
        "https://imgix.ranker.com/user_node_img/50088/1001742646/original/the-end-of-a-cat-and-_39_s-life-photo-u1?w=650&q=50&fm=pjpg&fit=crop&crop=faces"
    ]

# @markdown If you want to delete previous images in folder (if you run this cell before), select this:
delete_previous_examples = True # @param {type:"boolean"}

os.makedirs(example_orig_img_dir, exist_ok=True)

if delete_previous_examples:
    delete_contents_of_dir(example_orig_img_dir)

def download_example_image(url, path):
    filename = ''.join(c if c.isalnum() or c in ['-', '_'] else '_' for c in os.path.basename(url))
    if len(filename) > 25:
        filename = filename[:25]

    file_extension = url.split('.')[-1].lower()
    if file_extension not in ['png', 'jpg', 'jpeg']:
        file_extension = 'png'  # Default to PNG if extension is not supported

    full_path = os.path.join(path, f"{filename}.{file_extension}")

    response = requests.get(url)

    if response.status_code == 200:
        with open(full_path, 'wb') as f:
            f.write(response.content)
        return full_path

    print(f"Failed to download image from URL: {url}")
    return None

def is_url_or_path(input_string):
    url_pattern = r'^https?://.*'

    if re.match(url_pattern, input_string):
        return 1

    if os.path.exists(input_string):
        return 2

    return None

for img in example_images:
    img = img.replace('\\', '/')
    image_type = is_url_or_path(img)
    if image_type == 1:
        image_path = download_example_image(img, example_orig_img_dir)
    elif image_type == 2:
        image_path = img
        shutil.copy(image_path, example_orig_img_dir)
    else:
        print(f"Invalid image: {img}")
        continue

print("Done!")

# @markdown Visualize your images (rows - how many images will be in one row):
rows = 5 # @param {type:"integer"}

plot_images_in_folder(example_orig_img_dir, rows)

In [None]:
# @title 5 Detect faces on example images

# @markdown If you want to delete previous images in the folder (if you run this cell before), select this:
delete_previous_detected_faces = True # @param {type:"boolean"}

# @markdown If your images are already cropped faces, select this:
is_already_cropped = False # @param {type:"boolean"}

# @markdown Resized image size:
detect_img_size = 640 # @param {type:"integer"}

# @markdown Confidence that it is a face:
detect_threshold = 0.5 # @param {type:"number"}
detect_iou_threshold = 0.5 # @param {type:"number"}

os.makedirs(example_faces_dir, exist_ok=True)

if delete_previous_detected_faces:
    delete_contents_of_dir(example_faces_dir)

if is_already_cropped:
    file_list = os.listdir(example_orig_img_dir)

    for file_name in file_list:
        from_path = os.path.join(example_orig_img_dir, file_name)
        to_path = os.path.join(example_faces_dir, file_name)
        if os.path.isfile(from_path):
            shutil.copy(from_path, to_path)
else:
    if colab:
        !python yolov5/detect.py --weights "{yolov_model_path}" --source "{example_orig_img_dir}" \
        --imgsz {detect_img_size} --nosave --project "{example_character}" --name "{example_faces_dir}" \
        --conf-thres {detect_threshold} --iou-thres {detect_iou_threshold}  --exist-ok --save-txt --save-conf
    else:
        script_runner(f'python yolov5/detect.py --weights "{yolov_model_path}" --source "{example_orig_img_dir}" '
                      f'--imgsz {detect_img_size} --nosave --project "{example_character}" --name "{example_faces_dir}" '
                      f'--conf-thres {detect_threshold} --iou-thres {detect_iou_threshold}  --exist-ok --save-txt --save-conf')
    n_ex = save_cropped(os.path.join(example_faces_dir, "labels"), example_orig_img_dir, example_faces_dir, min_face_size, max_aspect_ratio, adjust_crop_box)
    clear_output()
    print("Total detected %d face(s)!" % n_ex)
    print("If some of your faces were not detected, choose a different picture(s)!")
    print("If it is already a face, select is_already_cropped, del_previous, and rerun this cell!")

plot_images_in_folder(example_faces_dir)

In [None]:
# @title 6 Load face similarity model

# @markdown Init efficientnet-b0 siamese network for calculating faces similarity and load downloaded weights.

class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.base_model = models.efficientnet_b0()

    def forward(self, x):
        # Get the feature vectors for both images
        return self.base_model(x)

# Initialize Siamese Network
model = SiameseNetwork()

# Load the downloaded .pt model weights
model.load_state_dict(torch.load(similarity_model_path, map_location=torch.device(device)))
model.to(device)

clear_output()
print("Done!")

In [None]:
# @title 7 Define functions for calculating embeddings

# @markdown It's a batch size for face images for calculating embeddings vectors.
embeddings_batch_size = 32 # @param {type:"integer"}

# @markdown IMPORTANT! If you change <b>face_image_size</b> it will not work with current model.
# @markdown EfficientNet-B0 model requires 224x224 images as input. If you are using other model, you can change it.
face_image_size = 224 # @param {type:"integer"}

def find_images_to_check(images_folder):
    images_to_check = []
    for root, dirs, files in os.walk(images_folder):
        files = [os.path.join(root, file) for file in files if file.endswith(supported_image_formats)]
        if len(files):
            images_to_check.extend(files)

    return images_to_check

class ResizeAndPad:
    def __init__(self, size, fill_color=(0, 0, 0)):
        self.size = size
        self.fill_color = fill_color

    def __call__(self, image):
        # Calculate aspect ratio of the original image
        original_width, original_height = image.size
        aspect_ratio = original_width / original_height

        # Calculate the new dimensions
        target_width, target_height = self.size
        target_aspect_ratio = target_width / target_height

        if target_aspect_ratio > aspect_ratio:
            # The target image is wider than the original image, so we need to pad horizontally
            new_width = int(target_height * aspect_ratio)
            resized_image = image.resize((new_width, target_height))
            left_padding = (target_width - new_width) // 2
            padded_image = Image.new('RGB', (target_width, target_height), self.fill_color)
            padded_image.paste(resized_image, (left_padding, 0))
        else:
            # The target image is taller than the original image, so we need to pad vertically
            new_height = int(target_width / aspect_ratio)
            resized_image = image.resize((target_width, new_height))
            top_padding = (target_height - new_height) // 2
            padded_image = Image.new('RGB', (target_width, target_height), self.fill_color)
            padded_image.paste(resized_image, (0, top_padding))

        return padded_image

transform = transforms.Compose([
    ResizeAndPad((face_image_size, face_image_size)),
    transforms.Lambda(lambda x: x.convert('RGB') if x.mode == 'RGBA' else x),
    transforms.ToTensor(),
])

class CustomDataset(Dataset):
    def __init__(self, faces_to_check, transform=None):
        self.faces_to_check = faces_to_check
        self.transform = transform

    def __len__(self):
        return len(self.faces_to_check)

    def __getitem__(self, idx):
        image_path = self.faces_to_check[idx]

        img = Image.open(image_path)

        if self.transform:
            img = self.transform(img)

        return img, image_path

def make_embeddings(model, loader):
    model.eval()

    embeddings = []
    paths = []

    with torch.no_grad(), tqdm(total=len(loader)) as pbar:
        for batch_idx, (images, images_paths) in enumerate(loader):
            images = images.to(device)

            result = model(images).cpu().numpy()

            embeddings.extend(result)
            paths.extend(images_paths)

            pbar.update(1)

    return embeddings, paths

print("Done!")

In [None]:
# @title 8 Make embeddings for scrapped faces images
# @markdown Find images, create a loader, and make embeddings.

embeddings_file = os.path.join(project_dir, "embeddings.pkl")

if os.path.exists(embeddings_file):
    with open(embeddings_file, "rb") as file:
        saved_data = pickle.load(file)

    faces_paths = saved_data["faces_paths"]
    faces_embeddings = saved_data["faces_embeddings"]
else:
    faces_paths = find_images_to_check(faces_dir)
    make_embeddings_dataset = CustomDataset(faces_paths, transform)
    make_embeddings_loader = torch.utils.data.DataLoader(make_embeddings_dataset, batch_size=embeddings_batch_size)

    print("Total faces:", len(faces_paths))
    print("Total steps:", len(make_embeddings_loader))

    print("\nMaking embeddings...")
    faces_embeddings, faces_paths = make_embeddings(model, make_embeddings_loader)

    data_to_save = {
        "faces_paths": faces_paths,
        "faces_embeddings": faces_embeddings
    }

    with open(embeddings_file, "wb") as file:
        pickle.dump(data_to_save, file)

print("\nDone!")

In [None]:
# @title 9 Make embeddings for example images
# @markdown Get example images, create a loader, and make embeddings.
faces_examples = find_images_to_check(example_faces_dir)
e_make_embeddings_dataset = CustomDataset(faces_examples, transform)
e_make_embeddings_loader = torch.utils.data.DataLoader(e_make_embeddings_dataset, batch_size=embeddings_batch_size)

print("Total example faces:", len(faces_examples))
print("Total steps:", len(e_make_embeddings_loader))

print("\nMaking embeddings...")
e_faces_embeddings, e_faces_paths = make_embeddings(model, e_make_embeddings_loader)
print("\nDone!")

In [None]:
# @title 10 Calculate distances
# @markdown Calculate distances between embeddings of detected faces from scrapped images and your example faces.
distances = pairwise_distances(faces_embeddings, e_faces_embeddings)
print("Distances calculated!")

In [None]:
# @title 11 üü© Unload similarity model from GPU VRAM
# @markdown Delete model and empty torch cache.

# @markdown If you want to make another character from this anime/movie, you can skip this cell.
# @markdown When you finish with this character, you will not need to run 1-3, 6-8 steps of this section.

del model
torch.cuda.empty_cache()

print("Model unloaded from GPU VRAM!")

In [None]:
# @title 12 Filter faces images by max faces on them

# @markdown Will ignore images where the number of faces is more than this value. Maybe useful if you don't want samples where many people are in your dataset. Leave blank if you don't want to ignore any of the images.
max_faces_on_image = 0 # @param {type:"integer"}

if not max_faces_on_image:
    max_faces_on_image = float('inf')

def filter_multiple_persons_images_with_embeddings(faces_paths, max_faces):
    original_name_counts = {}
    to_delete = set()

    for i, image_name in enumerate(faces_paths):
        original_name = image_name.replace('\\','/').split('/')[-1].split('-')[0]  # Extract original image name
        if original_name in original_name_counts:
            original_name_counts[original_name] += 1
            if original_name_counts[original_name] > max_faces:
                to_delete.add(original_name)
        else:
            original_name_counts[original_name] = 1

    valid_indices = [i for i, image_name in enumerate(faces_paths) if image_name.replace('\\','/').split('/')[-1].split('-')[0] not in to_delete]

    return valid_indices, original_name_counts

filtered_indices, counts = filter_multiple_persons_images_with_embeddings(faces_paths, max_faces_on_image)

print("To check:", len(filtered_indices))

In [None]:
# @title 13 Find similar faces

# @markdown If the pairwise distance of two embedding vectors is lower than this value, the face will be marked as similar.
# @markdown This value depends on your character. You can try different values and watch the result.
# @markdown You can view your dataset and manually filter images by this threshold in the next cells.
threshold = 35 # @param {type:"number"}

# @markdown If you select this, face should be similar to all your example images. Gives less results.
and_method = False # @param {type:"boolean"}

# @markdown Visualize random subset of detected images:
visualize = True # @param {type:"boolean"}
n_images = 50 # @param {type:"integer"}
n_rows = 10 # @param {type:"integer"}

def find_similar_images(distances, indices, and_method=False):
    similar = []

    for i in indices:
        distances_row = distances[i]
        similar_indices = np.where(distances_row < threshold)[0]
        if (not and_method and len(similar_indices)) or (and_method and len(similar_indices) == len(distances_row)):
            similar.append(i)

    return similar

similar_indices = find_similar_images(distances, filtered_indices, and_method)
similar_images = [faces_paths[index] for index in similar_indices]
print("Done! Found %d possible similar images!" % len(similar_images))

if visualize:
    visualize_images(similar_images, n_images, n_rows)

In [None]:
# @title 14 Create a FiftyOne dataset from faces

# @markdown Create a fiftyone dataset and tag similar faces.
dataset_similar_faces = fo.Dataset.from_dir(faces_dir, dataset_type=fo.types.ImageDirectory)
dataset_similar_faces.tags = ["similar"]

similar_indices_cp = deepcopy(similar_indices)

for i, face in enumerate(faces_paths):
    face_path = os.path.abspath(face)
    sample = dataset_similar_faces[face_path]
    for n, dist in enumerate(distances[i], 1):
        sample[f"Distances {n}"] = dist
    if similar_indices_cp and i == similar_indices_cp[0]:
        similar_indices_cp.pop(0)
        sample.tags.append("similar")
    sample.save()

for sample in dataset_similar_faces:
    orig_name = sample.filepath.replace('\\','/').split('/')[-1].split('-')[0]
    sample["Faces on image"] = counts[orig_name]
    sample.save()

faces_session = None

clear_output()
print("Done!")

In [None]:
# @title 15 üü© Run the Fiftyone app to manually select/deselect wrong faces (optional, but highly recommended)

# @markdown Here you can try different values of threshold (marked as distance N), filter your images and manually select and deselect images as similar.

# @markdown <b>My recommendations:</b> set your desired max_faces_on_image and find the best distance value where there are many images of your desired character.
# @markdown Select all images as similar (tag icon at the top panel, choose similar, press apply).
# @markdown Then select the wrong images (press the square box at the top of images), click on the tag icon, deselect a similar tag, and press apply.

sidebar_groups = fo.DatasetAppConfig.default_sidebar_groups(dataset_similar_faces)
for group in sidebar_groups[1:-1]:
    group.expanded = False
dataset_similar_faces.app_config.sidebar_groups = sidebar_groups
dataset_similar_faces.save()

if faces_session is None:
    faces_session = fo.launch_app(dataset_similar_faces)
else:
    faces_session.show()

# @markdown Input any text in the input below Fiftyone app to save changes and quit!
input("Input something to save and quit: ")

clear_output()

In [None]:
# @title 16 Save similar faces images separately

# @markdown Save all face images tagged as similar!

# @markdown If you want to delete previous images in the folder (if you run this cell before), select this:
del_previous_saved_faces = True # @param {type:"boolean"}

os.makedirs(similar_faces_dir, exist_ok=True)

if del_previous_saved_faces:
    delete_contents_of_dir(similar_faces_dir)

view = dataset_similar_faces.match_tags("similar")

n_final_similar = len(view)

view.export(
    export_dir=similar_faces_dir,
    dataset_type=fo.types.ImageDirectory
)

if session is not None:
    session.refresh()
    fo.close_app()

clear_output()
print("Done! Total: %d images" % n_final_similar)

In [None]:
# @title 17 Save original images with similar faces to the result folder

# @markdown Save original scrapped images with your character to the "result" folder.

# @markdown If you want to delete previous images in the folder (if you run this cell before), select this:
delete_previous_results = True # @param {type:"boolean"}

# @markdown It was the last step! Now you can tag your images using section 5 or zip and export results.
# @markdown If you are running this notebook locally, your dataset is already in the workingDir/projectName/result folder, do what you want!

os.makedirs(character_results, exist_ok=True)

if delete_previous_results:
    delete_contents_of_dir(character_results)

def copy_images_to_folder(image_paths, destination_folder):
    for image_path in image_paths:
        image_filename = os.path.basename(image_path)
        destination_path = os.path.join(destination_folder, image_filename)
        if os.path.exists(image_path):
            shutil.copy(image_path, destination_path)
        else:
            print("Error! Image %s doesn't exist!" % image_path)

def remove_confidence(filename):
    dash_index = filename.rfind('-')
    second_dash_index = filename.rfind('-', 0, dash_index)
    last_dot_index = filename.rfind('.')

    if second_dash_index >= 0 and last_dot_index > second_dash_index:
        return filename[:second_dash_index] + filename[last_dot_index:]

    return filename

def get_original_images_path(similar_images, orig_dir):
    original_images = []
    for image_path in similar_images:
        original_image_filename = remove_confidence(image_path)
        original_image_path = os.path.join(orig_dir, original_image_filename)
        original_images.append(original_image_path)

    return original_images

curated_similar_images = [file for file in os.listdir(similar_faces_dir) if file.endswith(supported_image_formats)]
orig_images = get_original_images_path(curated_similar_images, filtered_dir)
copy_images_to_folder(orig_images, character_results)

print("Done! Total: %d images in your dataset!" % len(orig_images))

## **5Ô∏è‚É£üü© Tag your images**
Used code from  [Dataset Maker colab](https://colab.research.google.com/github/hollowstrawberry/kohya-colab/blob/main/Dataset_Maker.ipynb) with some modifications.

In [None]:
#@title 1 Tag images

#@markdown We will be using AI to automatically tag your images, specifically [Waifu Diffusion](https://huggingface.co/SmilingWolf/wd-v1-4-swinv2-tagger-v2) in the case of anime and [BLIP](https://huggingface.co/spaces/Salesforce/BLIP) in the case of photos.
#@markdown Giving tags/captions to your images allows for much better training. This process should take a couple of minutes. <p>
#@markdown Select this to use wd14_tagger and deselect to use blip captions (like for photos).
anime_tags = True # @param {type:"boolean"}
#@markdown **Anime:** The threshold is the minimum level of confidence the tagger must have in order to include a tag. Lower threshold = More tags. Recommended 0.35 to 0.5
tag_threshold = 0.35 # @param {type:"slider", min:0.0, max:1.0, step:0.01}
tag_batch_size = 8 # @param {type:"number"}
blacklist_tags = "bangs, breasts, multicolored hair, two-tone hair, gradient hair, virtual youtuber, official alternate costume, official alternate hairstyle, official alternate hair length, alternate costume, alternate hairstyle, alternate hair length, alternate hair color" #@param {type:"string"}
#@markdown **Photos:** The minimum and maximum length of tokens/words in each caption.
max_data_loader_n_workers = 2 # @param {type:"number"}
caption_min = 10 # @param {type:"number"}
caption_max = 75 # @param {type:"number"}

if anime_tags:
    if colab:
        !python ./kohya-trainer/finetune/tag_images_by_wd14_tagger.py "{character_results}" \
                --repo_id=SmilingWolf/wd-v1-4-swinv2-tagger-v2 --general_threshold {tag_threshold} \
                --batch_size {tag_batch_size}
    else:
        script_runner(f'python ./kohya-trainer/finetune/tag_images_by_wd14_tagger.py "{character_results}" '
                      f'--repo_id=SmilingWolf/wd-v1-4-swinv2-tagger-v2 --general_threshold {tag_threshold} '
                      f'--batch_size {tag_batch_size}')

    print("Removing underscores and blacklist...")
    blacklisted_tags = [t.strip() for t in blacklist_tags.split(",")]
    from collections import Counter
    top_tags = Counter()
    for txt in [f for f in os.listdir(character_results) if f.lower().endswith(".txt")]:
        with open(os.path.join(character_results, txt), 'r') as f:
            tags = [t.strip() for t in f.read().split(",")]
            tags = [t.replace("_", " ") if len(t) > 3 else t for t in tags]
            tags = [t for t in tags if t not in blacklisted_tags]
        top_tags.update(tags)
        with open(os.path.join(character_results, txt), 'w') as f:
            f.write(", ".join(tags))

    clear_output()
    print(f"\tTagging complete. Here are the top 50 tags in your dataset:")
    print("\n".join(f"{k} ({v})" for k, v in top_tags.most_common(50)))

    del top_tags

else:
    os.chdir("kohya-trainer")

    if colab:
        !python ./finetune/make_captions.py "{character_results}" \
                --beam_search --max_data_loader_n_workers {max_data_loader_n_workers} \
                --batch_size {tag_batch_size} --min_length {caption_min} \
                --max_length {caption_max} --caption_extension .txt
    else:
        script_runner(f'python ./finetune/make_captions.py "{character_results}" '
                      f'--beam_search --max_data_loader_n_workers {max_data_loader_n_workers} '
                      f'--batch_size {tag_batch_size} --min_length {caption_min} '
                      f'--max_length {caption_max} --caption_extension .txt')

    os.chdir("../")

    captions = [f for f in os.listdir(character_results) if f.lower().endswith(".txt")]
    sample = []
    for txt in random.sample(captions, min(10, len(captions))):
      with open(os.path.join(character_results, txt), 'r') as f:
        sample.append(f.read())

    clear_output()
    print(f"\tCaptioning complete. Here are {len(sample)} example captions from your dataset:")
    print("".join(sample))

    del sample

In [None]:
#@title 2 Curate your tags

#@markdown Modify your dataset's tags. You can run this cell multiple times with different parameters. <p>

#@markdown Put an activation tag at the start of every text file. This is useful to make learning better and activate your Lora easier. Set keep_tokens to 1 when training.<p>
#@markdown Common tags that are removed such as hair color, etc. will be "absorbed" by your activation tag.
global_activation_tag = "global_tag" #@param {type:"string"}
remove_tags = "" #@param {type:"string"}
#@markdown &nbsp;

#@markdown In this advanced section, you can search text files containing matching tags, and replace them with less/more/different tags. If you select the checkbox below, any extra tags will be put at the start of the file, letting you assign different activation tags to different parts of your dataset. Still, you may want a more advanced tool for this.
search_tags = "" #@param {type:"string"}
replace_with = "" #@param {type:"string"}
search_mode = "OR" #@param ["OR", "AND"]
new_becomes_activation_tag = False #@param {type:"boolean"}
#@markdown These may be useful sometimes. Will remove existing activation tags, be careful.
sort_alphabetically = False #@param {type:"boolean"}
remove_duplicates = False #@param {type:"boolean"}

def split_tags(tagstr):
    return [s.strip() for s in tagstr.split(",") if s.strip()]

activation_tag_list = split_tags(global_activation_tag)
remove_tags_list = split_tags(remove_tags)
search_tags_list = split_tags(search_tags)
replace_with_list = split_tags(replace_with)
replace_new_list = [t for t in replace_with_list if t not in search_tags_list]

replace_with_list = [t for t in replace_with_list if t not in replace_new_list]
replace_new_list.reverse()
activation_tag_list.reverse()

remove_count = 0
replace_count = 0

for txt in [f for f in os.listdir(character_results) if f.lower().endswith(".txt")]:
    with open(os.path.join(character_results, txt), "r") as f:
        tags = [s.strip() for s in f.read().split(",")]

    if remove_duplicates:
        tags = list(set(tags))
    if sort_alphabetically:
        tags.sort()

    for rem in remove_tags_list:
        if rem in tags:
            remove_count += 1
            tags.remove(rem)

    if (
        "AND" in search_mode
        and all(r in tags for r in search_tags_list)
        or "OR" in search_mode
        and any(r in tags for r in search_tags_list)
    ):
        replace_count += 1
        for rem in search_tags_list:
            if rem in tags:
                tags.remove(rem)
        for add in replace_with_list:
            if add not in tags:
                tags.append(add)
        for new in replace_new_list:
            if new_becomes_activation_tag:
                if new in tags:
                    tags.remove(new)
                tags.insert(0, new)
            else:
                if new not in tags:
                    tags.append(new)

    for act in activation_tag_list:
        if act in tags:
            tags.remove(act)
        tags.insert(0, act)

    with open(os.path.join(character_results, txt), "w") as f:
        f.write(", ".join(tags))

if global_activation_tag:
    print(f"\nApplied new activation tag(s): {', '.join(activation_tag_list)}")
if remove_tags:
    print(f"Removed {remove_count} tags.")
if search_tags:
    print(f"Replaced in {replace_count} files.")

print("\nDone!")

In [None]:
# @title 3 üü© Analyze tags (show top tags)
show_top_tags = 50 #@param {type:"number"}

top_tags = Counter()

for txt in [f for f in os.listdir(character_results) if f.lower().endswith(".txt")]:
  with open(os.path.join(character_results, txt), 'r') as f:
    top_tags.update([s.strip() for s in f.read().split(",")])

top_tags = Counter(top_tags)
print(f"\tTop {show_top_tags} tags:")
for k, v in top_tags.most_common(show_top_tags):
  print(f"{k} ({v})")

del top_tags

## **6Ô∏è‚É£üü© Zip results**

Don't forget to run the first section if you are working in a new runtime!

In [None]:
# @title 1 Zip your result folder

# @markdown if you rerun some cells to made dataset of more then one characters, you can zip all characters in result folder
zip_all_characters = False #@param {type:"boolean"}

def zip_folder(folder_path, output_path):
    with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, folder_path)
                zipf.write(file_path, arcname)

# @markdown Filename of your zip file:
zip_file_name = "" #@param {type:"string"}

# @markdown If you want to save results to google drive write <b>drive/My Drive/</b>.
# @markdown Don't forget to connect this notebook to your google drive (run cell in the second section).
# @markdown If you want to save to default path: <b>{working_dir}/{project_name}/zif_file_name.zip</b> leave field blank.
zip_output_path = "" #@param {type:"string"}

if not zip_file_name:
    zip_file_name = character_name + ".zip"

if not zip_output_path:
    zip_output_path = project_dir

os.makedirs(zip_output_path, exist_ok=True)

zip_file = os.path.join(zip_output_path, zip_file_name)

if zip_all_characters:
    zip_folder(result_dir, zip_file)
else:
    zip_folder(character_results, zip_file)

print("Done!")

In [None]:
# @title 2 Zip custom folder

def zip_folder(folder_path, output_path):
    with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, folder_path)
                zipf.write(file_path, arcname)

# @markdown Type folder path to zip (you can copy desired path from 1.5 step):
folder_to_zip = "" #@param {type:"string"}
# @markdown Filename of your zip file:
zip_file_name = "" #@param {type:"string"}

# @markdown If you want to save results to google drive write <b>drive/My Drive/</b>.
# @markdown Don't forget to connect this notebook to your google drive (run cell in the second section).
# @markdown If you want to save to default path: <b>{working_dir}/{project_name}/zif_file_name.zip</b> leave field blank.
zip_output_path = "" #@param {type:"string"}

if not zip_file_name:
    zip_file_name = character_name + ".zip"

if not zip_output_path:
    zip_output_path = project_dir

os.makedirs(zip_output_path, exist_ok=True)

zip_file = os.path.join(zip_output_path, zip_file_name)

if folder_to_zip:
    zip_folder(folder_to_zip, zip_file)
    print("Done!")
else:
    print("Choose folder to zip!")


In [None]:
# @title 3 Zip entire project
def zip_folder(folder_path, output_path):
    with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, folder_path)
                zipf.write(file_path, arcname)

# @markdown Filename of your zip file:
zip_file_name = "" #@param {type:"string"}

# @markdown If you want to save results to google drive write <b>drive/My Drive/</b>.
# @markdown Don't forget to connect this notebook to your google drive (run cell in the second section).
# @markdown If you want to save to default path: <b>{working_dir}/{project_name}/zif_file_name.zip</b> leave field blank.
zip_output_path = "" #@param {type:"string"}

if not zip_file_name:
    zip_file_name = "project.zip"

if not zip_output_path:
    zip_output_path = project_dir

os.makedirs(zip_output_path, exist_ok=True)

zip_file = os.path.join(zip_output_path, zip_file_name)
zip_folder(project_dir, zip_file)

print("Done!")

In [None]:
# @title 4 Download the zip file to your device (only if colab)

# @markdown Leave blank if you want to download the zip archive created by executing one of the last two cells. Or write a path two your zip file.
zip_file_path = "" #@param {type:"string"}

if not zip_file_path:
    zip_file_path = zip_file

if colab:
    files.download(zip_file_path)
    print("Download should start!")
else:
    print("You are not in a colab! If it's wrong, please select colab at first cell!")

## **7Ô∏è‚É£üü© Unzip acrhive**

In [None]:
# @title Unzip zip file

# @markdown Write a path to your zip file:
path_to_zip_file = "" #@param {type:"string"}

# @markdown Leave blank to extract to project dir or enter extraction path:
extracted_folder_path = "" #@param {type:"string"}

# @markdown May be useful for some colab users! You can upload your zip file directly in colab.
# @markdown Click on the "Files" icon on the left sidebar (it looks like a folder), click on the "Upload" button and select the zip file from your local device.
# @markdown Path to zip file will be <b>name_of_file.zip</b>. If your file is in the root folder in google drive, path_to_zip_file will be <b>drive/My Drive/name_of_file.zip</b>.

if path_to_zip_file:
    if not extracted_folder_path:
        extracted_folder_path = project_dir

    os.makedirs(extracted_folder_path, exist_ok=True)

    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(extracted_folder_path)

    print("Done!")
else:
    print("You should fill path_to_zip_file variable!")

## **8Ô∏è‚É£üü© Deleting options**

In [None]:
#@title 1 Delete all tags (.txt files) in result folder
def del_all_files_with_extension(dir, extension):
    files = os.listdir(dir)
    for file in files:
        if file.endswith(extension):
            file_path = os.path.join(dir, file)
            os.remove(file_path)

del_all_files_with_extension(character_results, ".txt")
print("Done!")

In [None]:
#@title 2 Delete project

delete_contents_of_dir(project_dir)
print("Done!")

In [None]:
#@title 3 Delete all content in the working dir (Don't run if your path is "./" or "drive/My Drive")

if working_dir == os.path.abspath(os.getcwd()) or working_dir == os.path.abspath(os.path.join(os.getcwd(), "drive/My Drive")):
    print("Don't run this cell! Read above!")
else:
    delete_contents_of_dir(working_dir)
    print("Done!")

## **‚ú≥Ô∏è Give feedback**

I worked on this project for three weeks. Most of the time was spent preparing the dataset and training the model to determine if two anime faces are the same. If you like something, something is missing, or you think this project is not working well, write to me (I left links where you can contact me).

What I want to do:
1. Improve the current model to determine if two faces are the same. The model is often wrong. Most likely due to insufficient data for training. However, since the current model is already able to detect the same faces by itself, it will be much easier to increase the dataset than to collect it manually.
2. Try to increase the speed of downloading pictures, as this is the longest process. Now I'm working on it. If you have any ideas about how it can be faster - write to me.
4. Ability to recognize and identify identical real faces. There are already a lot of such repositories on Git Hub (for example, [this one](https://github.com/ageitgey/face_recognition)).